使用Kafka Connect来导入/导出数据

無名 发表于: 2016-07-05   最后更新时间: 2016-10-25 19:05:56  
{{totalSubscript}} 订阅, 19,950 游览

Step 7: 使用 Kafka Connect 来 导入/导出 数据

从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。在这个快速入门里,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件,首先,我们首先创建一些种子数据用来测试:

echo -e "foo\nbar" > test.txt

接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。第一个始终是kafka Connect进程,如kafka broker连接和数据库序列化格式,剩下的配置文件每个指定的连接器来创建,这些文件包括一个独特的连接器名称,连接器类来实例化和任何其他配置要求的。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这是示例的配置文件,使用默认的本地集群配置并创建了2个连接器:第一个是导入连接器,从导入文件中读取并发布到Kafka主题,第二个是导出连接器,从kafka主题读取消息输出到外部文件,在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该读取从

test.txt

和写入到topic

connect-test

,导出连接器从主题

connect-test

读取消息写入到文件

test.sink.txt

. 我们可以通过验证输出文件的内容来验证数据数据已经全部导出:

cat test.sink.txt
 foo
 bar

注意,导入的数据也已经在Kafka主题

connect-test

里,所以我们可以使用该命令查看这个主题:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
 {"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器继续处理数据,因此我们可以添加数据到文件并通过管道移动:

echo "Another line" >> test.txt

你应该会看到出现在消费者控台输出一行信息并导出到文件。

更新于 2016-10-25

这里可以 指定offset 读取数据吗?
新增数据的时候,添加进去一条脏数据,可以指定删除这条数据吗?

可以指定offset读,但删不了的

萝卜&青菜 6年前

ConnectStandalone,就只有这个kafka的进程,按理说,应该没问题哈,怎么一直报 ERROR Failed to flush WorkerSourceTask;ERROR Failed to commit offsets for WorkerSourceTask;求解?大神

应该是数据量太多导致的timeout。增加offset.flush.timeout.ms试试。

萝卜&青菜 6年前

你好,这个问题怎么解决呢?比较着急,谢谢,朋友;

dingo 7年前

启动后报错,

[2016-08-17 14:59:57,473] ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:291)

[2016-08-17 14:59:57,473] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109)

能帮忙解释下是哪出问题了吗?

dingo -> dingo 7年前

找到原因了 ,kafka这边刚才把进程给杀了,未启动

追求…… -> dingo 7年前

我和你报一样的错,我这边kafka正常运行呢,这是怎么回事?

大头 -> 追求…… 7年前

要确认那几个topic都在正常状态,connect-*,其所在主机都处于正常。

查看history更多相关的文章或提一个关于history的问题,也可以与我们一起分享文章