8.3 Connector开发指南
本指南介绍了开发者怎么样编写新的connector,用于kafka和其他系统之间的数据移动。简要回顾几个关键的概念,然后介绍如何创建一个简单的connector。
核心概念和API
在Kafka和其他系统之间复制数据,用户创建自定义的从系统中pull数据或push数据到系统的Connector(连接器)
。Connector有两种形式:SourceConnectors
从其他系统导入数据(如:JDBCSourceConnector
将导入一个关系型数据库到Kafka)和SinkConnectors
导出数据(如:HDFSSinkConnector将kafka主题的内容导出到HDFS文件)。connector不会执行任何复制自己的数据:它们的配置展示了要复制的数据,而Connector是负责打破这一工作变成一组可以分配worker的任务。这些任务也有两种相对应的形式:SourceTask
和 SinkTask
。在手里的任务,每个任务必须复制其子集的数据或Kafka的。在Kafka Connect,这些任务作为一组具有一致性模式的记录(消息)组成的输出和输入流。有时,这种映射是明显的:在一组日志文件,每个文件可以被视为一个流,每个分析的行形成一个记录,使用相同的模式和offset存储在文件中的字节偏移。在其他的情况下可能需要更多的努力来映射到该模型:一个JDBC连接器可以将每张表映射到一个流,但offset是不太清楚的。一种可能的映射使用时间戳列来生成查询递增返回新的数据,上次查询时间戳可被用作offset。
流和记录(Streams and Records)
每个流都应该有一个key-value的记录序列。key和value可以具有复杂的结构 — 提供了许多原始类型,但数组、对象和嵌套的数据结构也可以。运行时,数据格式不承担任何特定的序列化格式,这种转换是由框架内部处理的。除了key和value,记录(由源和传递到sink产生的)关联的流ID和offset。这些都是使用了框架。定期提交的offset的数据(已处理的),以便在发生故障时,处理可以从最后一个提交的偏移量恢复,避免不必要的重复处理。
动态连接器(Dynamic Connectors)
并非所有的工作都是静态的,Connector(连接器)
的实现还负责监控外部系统(根据外部系统的变化可能需要重新配置)。例如,在JDBCSourceConnector
的例子中,Connector可分配一组表到每个任务。当创建一个新的表,它必须要发现这个新表,并更新到配置把新的表分配到任务中。当注意到一个变化,需要重新配置(或任务数量的变化),它通知框架更新相应的任务。
开发一个简单的连接器(Connector)
开发一个连接器只需要实现两个接口,Connector
和Task
。在Kafka源代码里file包下有一个简单的例子。该connector是用于独立模式,SourceConnector/SourceTask
实现文件每行读取,并作为记录(消息)用SinkConnector/SinkTask
把每个记录写到一个文件。本节的其余部分将通过一些代码来演示创建一个连接器的关键步骤,但开发者也应参考完整的例子的源代码,大部分的细节都略为简单。
connector例子
我们拿SourceConnector
作为一个简单的例子。SinkConnector
的实现也非常类似。通过创建一个继承SourceConnector的类开始,增加一个字段存储解析的配置信息(文件名读取和发送数据到topic):
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
最简单的方法是getTaskClass()
,它定义了在工作进程中实例化的实际读取数据的类:
@Override
public Class<? extends Task> getTaskClass() {
return FileStreamSourceTask.class;
}
定义FileStreamSourceTask
类,接下来,我们增加一些标准的生命周期的方法,start()
和stop()
:
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required.
}
最后,是实现真正核心的getTaskConfigs()
。在这种情况下,我们只处理一个文件,这样即使我们生成更多的任务(根据maxTask参数),我们返回一个列表,只有一个入口:
@Override
public List<Map<String, String>> getTaskConfigs(int maxTasks) {
ArrayList>Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new Map<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
虽然在本例中未使用,SourceTask
也提供了两个API来提交源系统的offset:commit
和 commitRecord
。提供了有消息确认机制的源系统API。重写这些方法,允许source connector(源连接器)在源系统应答消息。一旦他们写入到kafka,无论消息是成批的还是单独。commit API
在源系统存储offset,由poll返回offset。这个API的实现是阻塞的,直到提交完成。commitRecord API
为在源系统中的每个写入到Kafka之后的SourceRecord保存offset,Kafka Connect自动记录offset。SourceTasks不需要实现。在connector需要确认在源系统acknowledge(应答)消息的情况下,即使有多个任务,这种方法实现通常是非常简单的,只需要一个API。它只确定输入任务的数量,这可能需要它从远程服务提取数据。然后瓜分数据。由于一些模式之间分配work(工作)非常普遍,有些实用工具提供了ConnectorUtils
来简化这些情况,注意,这个简单的例子不包括动态输入。详见在下一节讨论如何触发更新任务配置。
Task例子 - Source Task
接下来我们将介绍对应的SourceTask
的实现。我们将使用伪代码来展示大部分的实现,你可以参考完整的示例的源代码。和连接器一样,我们需要创建一个类(继承基于Task
的类)。它也有一些标准的生命周期方法:
public class FileStreamSourceTask extends SourceTask<Object, Object> {
String filename;
InputStream stream;
String topic;
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
}
@Override
public synchronized void stop() {
stream.close();
}
稍微简化了一下,说明这些方法是比较简单的,仅分配或释放资源。有两个点需要注意。首先,start()
方法还未处理以前offset的恢复,这将在后面的部分讨论,其次,stop()
方法是同步的。SourceTasks
提供了专门的线程,可以无限期的阻塞。所以需要从别的Worker线程来停止。接下来,我们实现任务的主要功能,poll()
方法。它从输入系统获取时间并返回一个List<SourceRecord>
。
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
ArrayList<SourceRecord> records = new ArrayList<>();
while (streamValid(stream) && records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
}
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
同样,我们省略了一些细节,我们可以看到重要的步骤:poll()
方法反复的调用,并每次调用都会尝试从文件中读取记录(消息)。读取每一行,也跟踪文件的offset。它使用该信息来创建一个输出SourceRecord
和四条信息:源分区(只有1个,读取单个文件),源offset(在文件中的字节offset),输出topic的name,和输出value(行,包括一个模式,表示value始终是一个string)。SourceRecord构造函数的其他实现也包括一个指定的输出分区和key。请注意,此实现使用正常的Java InputStream
接口,如果数据不可用则可以sleep(休眠)。这个可以接受,因为Kafka Connect为每个任务提供了一个专用的线程。而任务实现必须基于poll()
接口,这样有跟多的灵活性(自己实现)。在这种情况下,基于NIO的实现会更有效,但方法简单,快速实现,并兼容老版本(Java)。
Sink Tasks
前面已经介绍了如何实现一个简单的SourceTast
。不像SourceConnector
和 SinkConnector
, SourceTask
和 SinkTask
有很多不同的接口,因为SourceTask
使用pull接口,SinkTask
使用push接口。两者都有共同的生命周期的方法,但是SinkTask接口是完全不同的:
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public abstract void flush(Map<TopicPartition, Long> offsets);
SinkTask
文档有全部的细节,但接口和SourceTask
一样简单。put()方法包含大部分的实现,接收集合SinkRecords
,执行转换,并存储到目标系统。这个方法不需要确保返回之前数据完全写入到目标系统。事实上,在大部分情况下,内存缓冲是有用的,这样记录可以按一个整批次一次发送,从而减少插入事件进入downstream(下游)数据存储的开销。SinkRecord作为SourceRecords包含相同的信息:Kafka topic,partition,offset和事件key和value。flush()方法在offset提交过程期间,它允许任务从故障中恢复,并从安全点恢复(这样就没有事件会被错过)。该方法应该将任何未完成的数据push到目标系统,然后阻塞,直到写入已得到确认。通常offset参数可以忽略,但在某些情况下,想要实现存储offset信息到目标系统以提供正好一次
的语义。例如,HDFS connector(连接器)可以做到这一点,使用原子移动操作来确保flush()的原子性,确保提交数据和offset到最终的位置(HDFS)。
从之前的offset恢复(Resuming from Previous Offsets)
SourceTask包含一个流ID(输入的文件名)和每个记录的offset(文件中的位置)。框架使用了定时提交offset,所以在故障的情况下,任务恢复并减少再处理和可能重复的事件数(如果Kafka Connect正常的停止,可从最近的offset恢复,例如在独立模式或重新加载配置)。提交处理是完全自动化的,但只有connector知道如何返回到正确的位置,从该位置恢复。正确的恢复后,任务可以使用SourceContext传递其initialize()方法来访问offset数据。在initialize()中,我们会添加一些代码来读取offset(如果存在),并找到它的位置。
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Long lastRecordedOffset = (Long) offset.get("position");
if (lastRecordedOffset != null)
seekToOffset(stream, lastRecordedOffset);
}
当然,你可能需要为每个输入流读取大量的key。OffsetStorageReader接口也允许批量读取(有效的负载所有的offset),然后找出每个输入流到合适的位置。
动态的输入/输出流 (Dynamic Input/Output Streams)
Kafka Connect的工作被定义为拷贝大量数据。如拷贝一个完整的数据库,而不是创建多个job来分别复制每一张表。这种设计的后果是,一个connector的输入或输出流集合可以随着时间的推移而变化。Source connector需要监听源系统的改变。例如:数据库表的增加/删除。当发现改变,通过ConnectorContext对象通知框架,来重新加载。例如,在SourceConnector:
if (inputsChanged())
this.context.requestTaskReconfiguration();
该框架将立即请求新配置并更新任务,在重新加载配置之前优雅的提交自己的进度。注意,SourceConnector检测目前留给connector实现,如果需要一个额外的线程执行此监控。那么connector必须分配它自己。理想的情况下,监控变更代码将会隔离Connector和任务,不需担心。然而,变更也可能影响任务,最常见的是,当其中一个输入流在输入系统销毁了。例如:如果一张表从数据库中删除。如果任务在Connector之前遇到问题,如果Connector需要poll(轮询)变更,则任务将需要处理随后的错误。这些都是常见的问题。值得庆幸的是,这可以通过简单catch和处理相应的异常。SinkConnectors通常只能处理流的增加,它可以转换输出新的entires(例如,一个新的数据库表)。该框架管理对Kafka的输入的任何变更。例如当输入的topic集变化(由一个正则表达式的订阅)。 SinkTasks等待新的输入流,它可能需要在下游系统创建新的资源。比如数据库中的新表,最棘手的情况是在这种情况产生的冲突(多个SinkTask看到一个新的输入流并同时尝试去创建新的资源)。SinkConnectors,另一方面,一般不需要特殊的代码来处理一组动态的流。
连接配置验证 (Connect Configuration Validation)
Kafka Connect允许你在提交要执行的connector之前来验证connector的配置,并可以提供故障和推荐值的反馈。利用这个优势,connector开发者需要提供一个config()的实现来暴露配置给框架。下面的代码在FileStreamSourceConnector
定义配置和暴露给框架。
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
public ConfigDef config() {
return CONFIG_DEF;
}
ConfigDef类用于指定预期的配置集,对于每个配置,你可以指定name,type,默认值,描述,group信息,group中的顺序,配置值的宽和适于在UI显示的名称。另外,你可以通过重写Validator类来指定的验证逻辑用于单个配置验证。此外,由于配置之间有可能存在依赖关系。例如,配置中的vaild和visibillty的值可能会根据其他的配置的值而变化。为了解决这个问题,ConfigDef允许你指定一个配置依赖,并提供推荐系统的实现来获取valid的值并设置visibillty得到的当前配置值。此外,Connector的validate()方法提供了一个默认验证实现,返回一个列表(返回允许配置的列表,每个配置的配置错误和推荐值)。然后,它不适用配置验证的推荐值。你可以提供一个自定义的配置验证覆盖的默认实现,这可能会使用建议的值。
Working with Schemas
FileStream connector是很好的例子,因为它很简单,但是也有很普通的结构化数据 — 每行只有一个字符串(string),实际connector都需要更复杂的数据格式模式,要创建更复杂的数据,你需要使用Kafka Connect data API。除了原始类型的影响,大多数记录的结构需要2个类:Schema和Struct。API文档有完整参考。这里是一个简单的例子,创建一个Schema和Struct:
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75)
.build();
如果你实现一个source connector,你需要决定何时以及如何创建schema。如果可能的话,你应该尽量避免重复计算。例如,如果你的connector保证有一个固定的schema,用静态和使用单例,然而,大部分connector有动态的schema。一个简单的例子,一个数据库connector。甚至只考虑一张表,这个schema不会预定义整个connector(因为它表到表的变化),但它也不会固定为单表生命周期中,因为用户可能会ALTER TABLE(修改表)。connector必须能够检测这些变化并作出反应,Sink connector之所以简单,是因为它们消费数据,不需要创建shema。但是,它们应该同样的去关心验证它们收到的schema的格式是预期的。当schema不匹配 — 通常表示上游的生产者生产无效的数据不能被正确的转换到目标系统 — sink connectors抛出一个异常给系统。
Kafka Connect管理 (Kafka Connect Administration)
Kafka Connect的REST层提供了一组API来管理集群。这包括查看connector的配置和任务的状态,以及改变其当前的行为(例如改变配置和重新启动任务)。
当一个connector第一次被提交到集群,worker重新平衡集群中全部的connector和它们的任务。使每个worker具有大致相同的工作量。当connector递增和减少任务数,或connector配置发生变化时,也使用了同样的重新平衡程序。你可以使用REST API查看connector当前的状态和任务,包括每个分配worker的id。例如,查询一个源文件的状态(使用 GET /connectors/file-source/status)可能会产生如下的输入:
{
"name": "file-source",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.208:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.1.209:8083"
}
]
}
Connector和它们的任务发布状态状态更新到共享topic(配置status.storage.topic)集群监控中的所有的worker。因为woker异步消费这个topic,在一个状态改变之前,有一个典型的(短)延迟是可见的(通过状态API)。下列的状态可能是connector或是其任务之一:
- UNASSIGNED: connector/task 还未分配给worker.
- RUNNING: connector/task 正在运行.
- PAUSED: The connector/task has been administratively paused.
- FAILED: connector/tast故障(通常是抛出一个异常,状态输出报告)。
在多数情况下,connector和任务状态将匹配,尽管他们可能短时间不同(当发生变化或任务故障)。例如,当一个connector刚启动时,connector和其任务转换到运行状态之前,可能有明显的延时。当任务故障(因为Connect不会自动重启故障的任务) 状态还会出现分歧。手动的重启connector/任务时,可以使用上面列出的重启API。注意,如果你尝试去重启任务(这个任务正在rebalance),Connect将会返回一个409(冲突)状态代码。你可以在rebalance完成之后重试,但是没有必要,因为rebalance有效地重新启动集群中的所有connector和任务。
有时可以暂时的停止connector的消息处理。例如,如果远程系统正在维护,最好source connector停止poll,而不是一直报错误的异常日志刷屏。对于这个用例,Connect提供了一个暂停/恢复(pause/resume)的API。虽然source connector暂停了,Connect将停止poll额外的记录。当sink connector被暂停时,Connect将停止向它推送新的消息。暂停状态是持久性的。所以即使你重新启动集群,connector也不会再次启动消费处理,直到任务恢复。注意,connector的任务转换到PAUSED状态时有可能会有延迟。因为它需要在暂停的期间来完成所有处理。另外,失败的任务不会转换到PAUSED状态,直到它们重新启动。
大佬你好,我原原本本把kafka down到本地,然后照着大佬的讲解run了几个简单的例子,topic之间发了几个消息,都运行正常。但尝试connector file导入/导出数据,运行命令
出现下面错误,找不到FileStreamSourceConnector,我是哪个文件放错目录了吗?劳烦请指点一下,谢谢。
没有找到
SourceConnector
实现匹配,类似以下的实现:extends SourceConnector
https://www.orchome.com/10699
大佬能不能来看看我的问题
在看了。
感谢感谢
我这边从mongo通过Kafka connector 数据迁移到solr,但是 mongo数据库那太服务器宕机了,重启后,数据就不迁移了,这种情况怎么处理呢,大佬
找到你的消费者组,看看同步消费者组的offset是否已经到了最后导致的,如果是,你就要重置offset的位置才行。
这个怎么看,麻烦说详细点,多谢🙏
参考:kafka命令
您好,我有一个问题想请教,现在我正在使用Kafka connect做数据迁移,现在在使用sink将kafka中的数据迁移到目标数据库时,怎样转化为自定义的格式存入,例如我需要排除掉某些字段,或者对某些字段做修改
put(Collection<SinkRecord> records);
该方法,已经拿到消息了,你是可以直接对该消息进行处理的呀。
我看到官方有一个transforms应该也能够实现,我这儿想无侵入的去做转化,不知道是不是用transforms更合适
都是为了过滤,哪个简单用哪个吧。
嗯,我这边已经采用transforms去实现,感谢回复
Map<string, object=""> sourcePartition = Collections.singletonMap("filename", filename); Map<string, object=""> sourceOffset = Collections.singletonMap("position", streamOffset);
大佬。有个语法问题,为啥SourceRecord的sourcePartition和sourceOffset是用Map类型,filename和postion这两个key是固定的么
key名是固定的,这种设计方式的好处是底层升级之后,对外API不会变,都是字符串的“key”。
噢噢。谢谢大佬。还有一个疑问类型。。。一直没搞懂这里
好奇为啥要kafka的SourceRecord的sourcePartition和sourceOffset参数为啥要用Map
map意味着可以配置多个源呀。
比如streamValid(stream),inputsChanged() 这些方法是你写的伪代码还是真是方法?想看下这段的代码,可以嘛
这篇文章写的真不错。代码比kafka 自带的示例还优雅。请问可以看下完整源码嘛
多个task之间会有并发冲突吗,需要考虑线程安全的问题吗
task是平分消息,不会有冲突额
"最后,是实现真正核心的getTaskConfigs()。在这种情况下,我们只处理一个文件",真就机翻呗,In this case应该翻译成“在本例中”,而不是“在这种情况下”
请问博主,要自定义开发sourceConnector和sinkConnector需要maven依赖?
写错了,是需要什么依赖?
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
谢谢,已经找到了。client没有source和sink
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>0.10.1.1</version> </dependency>