Kafka Connector开发指南

原创
半兽人 发表于: 2016-08-22   最后更新时间: 2021-11-13 14:12:29  
{{totalSubscript}} 订阅, 45,406 游览

8.3 Connector开发指南

本指南介绍了开发者怎么样编写新的connector,用于kafka和其他系统之间的数据移动。简要回顾几个关键的概念,然后介绍如何创建一个简单的connector。

核心概念和API

在Kafka和其他系统之间复制数据,用户创建自定义的从系统中pull数据或push数据到系统的Connector(连接器)。Connector有两种形式:SourceConnectors从其他系统导入数据(如:JDBCSourceConnector将导入一个关系型数据库到Kafka)和SinkConnectors导出数据(如:HDFSSinkConnector将kafka主题的内容导出到HDFS文件)。connector不会执行任何复制自己的数据:它们的配置展示了要复制的数据,而Connector是负责打破这一工作变成一组可以分配worker的任务。这些任务也有两种相对应的形式:SourceTaskSinkTask。在手里的任务,每个任务必须复制其子集的数据或Kafka的。在Kafka Connect,这些任务作为一组具有一致性模式的记录(消息)组成的输出和输入流。有时,这种映射是明显的:在一组日志文件,每个文件可以被视为一个流,每个分析的行形成一个记录,使用相同的模式和offset存储在文件中的字节偏移。在其他的情况下可能需要更多的努力来映射到该模型:一个JDBC连接器可以将每张表映射到一个流,但offset是不太清楚的。一种可能的映射使用时间戳列来生成查询递增返回新的数据,上次查询时间戳可被用作offset。

流和记录(Streams and Records)

每个流都应该有一个key-value的记录序列。key和value可以具有复杂的结构 — 提供了许多原始类型,但数组、对象和嵌套的数据结构也可以。运行时,数据格式不承担任何特定的序列化格式,这种转换是由框架内部处理的。除了key和value,记录(由源和传递到sink产生的)关联的流ID和offset。这些都是使用了框架。定期提交的offset的数据(已处理的),以便在发生故障时,处理可以从最后一个提交的偏移量恢复,避免不必要的重复处理。

动态连接器(Dynamic Connectors)

并非所有的工作都是静态的,Connector(连接器)的实现还负责监控外部系统(根据外部系统的变化可能需要重新配置)。例如,在JDBCSourceConnector的例子中,Connector可分配一组表到每个任务。当创建一个新的表,它必须要发现这个新表,并更新到配置把新的表分配到任务中。当注意到一个变化,需要重新配置(或任务数量的变化),它通知框架更新相应的任务。

开发一个简单的连接器(Connector)

开发一个连接器只需要实现两个接口,ConnectorTask。在Kafka源代码里file包下有一个简单的例子。该connector是用于独立模式,SourceConnector/SourceTask实现文件每行读取,并作为记录(消息)用SinkConnector/SinkTask把每个记录写到一个文件。本节的其余部分将通过一些代码来演示创建一个连接器的关键步骤,但开发者也应参考完整的例子的源代码,大部分的细节都略为简单。

connector例子

我们拿SourceConnector作为一个简单的例子。SinkConnector的实现也非常类似。通过创建一个继承SourceConnector的类开始,增加一个字段存储解析的配置信息(文件名读取和发送数据到topic):

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

最简单的方法是getTaskClass(),它定义了在工作进程中实例化的实际读取数据的类:

@Override
public Class<? extends Task> getTaskClass() {
    return FileStreamSourceTask.class;
}

定义FileStreamSourceTask类,接下来,我们增加一些标准的生命周期的方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
}

@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最后,是实现真正核心的getTaskConfigs()。在这种情况下,我们只处理一个文件,这样即使我们生成更多的任务(根据maxTask参数),我们返回一个列表,只有一个入口:

@Override
public List<Map<String, String>> getTaskConfigs(int maxTasks) {
    ArrayList>Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new Map<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    configs.add(config);
    return configs;
}

虽然在本例中未使用,SourceTask也提供了两个API来提交源系统的offset:commitcommitRecord。提供了有消息确认机制的源系统API。重写这些方法,允许source connector(源连接器)在源系统应答消息。一旦他们写入到kafka,无论消息是成批的还是单独。commit API在源系统存储offset,由poll返回offset。这个API的实现是阻塞的,直到提交完成。commitRecord API为在源系统中的每个写入到Kafka之后的SourceRecord保存offset,Kafka Connect自动记录offset。SourceTasks不需要实现。在connector需要确认在源系统acknowledge(应答)消息的情况下,即使有多个任务,这种方法实现通常是非常简单的,只需要一个API。它只确定输入任务的数量,这可能需要它从远程服务提取数据。然后瓜分数据。由于一些模式之间分配work(工作)非常普遍,有些实用工具提供了ConnectorUtils来简化这些情况,注意,这个简单的例子不包括动态输入。详见在下一节讨论如何触发更新任务配置。

Task例子 - Source Task

接下来我们将介绍对应的SourceTask的实现。我们将使用伪代码来展示大部分的实现,你可以参考完整的示例的源代码。和连接器一样,我们需要创建一个类(继承基于Task的类)。它也有一些标准的生命周期方法:

public class FileStreamSourceTask extends SourceTask<Object, Object> {
    String filename;
    InputStream stream;
    String topic;

    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    }

    @Override
    public synchronized void stop() {
        stream.close();
    }

稍微简化了一下,说明这些方法是比较简单的,仅分配或释放资源。有两个点需要注意。首先,start()方法还未处理以前offset的恢复,这将在后面的部分讨论,其次,stop()方法是同步的。SourceTasks提供了专门的线程,可以无限期的阻塞。所以需要从别的Worker线程来停止。接下来,我们实现任务的主要功能,poll()方法。它从输入系统获取时间并返回一个List<SourceRecord>

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}

同样,我们省略了一些细节,我们可以看到重要的步骤:poll()方法反复的调用,并每次调用都会尝试从文件中读取记录(消息)。读取每一行,也跟踪文件的offset。它使用该信息来创建一个输出SourceRecord和四条信息:源分区(只有1个,读取单个文件),源offset(在文件中的字节offset),输出topic的name,和输出value(行,包括一个模式,表示value始终是一个string)。SourceRecord构造函数的其他实现也包括一个指定的输出分区和key。请注意,此实现使用正常的Java InputStream接口,如果数据不可用则可以sleep(休眠)。这个可以接受,因为Kafka Connect为每个任务提供了一个专用的线程。而任务实现必须基于poll()接口,这样有跟多的灵活性(自己实现)。在这种情况下,基于NIO的实现会更有效,但方法简单,快速实现,并兼容老版本(Java)。

Sink Tasks

前面已经介绍了如何实现一个简单的SourceTast。不像SourceConnectorSinkConnector, SourceTaskSinkTask有很多不同的接口,因为SourceTask使用pull接口,SinkTask使用push接口。两者都有共同的生命周期的方法,但是SinkTask接口是完全不同的:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }

    public abstract void put(Collection<SinkRecord> records);

    public abstract void flush(Map<TopicPartition, Long> offsets);

SinkTask文档有全部的细节,但接口和SourceTask一样简单。put()方法包含大部分的实现,接收集合SinkRecords,执行转换,并存储到目标系统。这个方法不需要确保返回之前数据完全写入到目标系统。事实上,在大部分情况下,内存缓冲是有用的,这样记录可以按一个整批次一次发送,从而减少插入事件进入downstream(下游)数据存储的开销。SinkRecord作为SourceRecords包含相同的信息:Kafka topic,partition,offset和事件key和value。flush()方法在offset提交过程期间,它允许任务从故障中恢复,并从安全点恢复(这样就没有事件会被错过)。该方法应该将任何未完成的数据push到目标系统,然后阻塞,直到写入已得到确认。通常offset参数可以忽略,但在某些情况下,想要实现存储offset信息到目标系统以提供正好一次的语义。例如,HDFS connector(连接器)可以做到这一点,使用原子移动操作来确保flush()的原子性,确保提交数据和offset到最终的位置(HDFS)。

从之前的offset恢复(Resuming from Previous Offsets)

SourceTask包含一个流ID(输入的文件名)和每个记录的offset(文件中的位置)。框架使用了定时提交offset,所以在故障的情况下,任务恢复并减少再处理和可能重复的事件数(如果Kafka Connect正常的停止,可从最近的offset恢复,例如在独立模式或重新加载配置)。提交处理是完全自动化的,但只有connector知道如何返回到正确的位置,从该位置恢复。正确的恢复后,任务可以使用SourceContext传递其initialize()方法来访问offset数据。在initialize()中,我们会添加一些代码来读取offset(如果存在),并找到它的位置。

  stream = new FileInputStream(filename);
    Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
    if (offset != null) {
        Long lastRecordedOffset = (Long) offset.get("position");
        if (lastRecordedOffset != null)
            seekToOffset(stream, lastRecordedOffset);
    }

当然,你可能需要为每个输入流读取大量的key。OffsetStorageReader接口也允许批量读取(有效的负载所有的offset),然后找出每个输入流到合适的位置。

动态的输入/输出流 (Dynamic Input/Output Streams)

Kafka Connect的工作被定义为拷贝大量数据。如拷贝一个完整的数据库,而不是创建多个job来分别复制每一张表。这种设计的后果是,一个connector的输入或输出流集合可以随着时间的推移而变化。Source connector需要监听源系统的改变。例如:数据库表的增加/删除。当发现改变,通过ConnectorContext对象通知框架,来重新加载。例如,在SourceConnector:

    if (inputsChanged())
        this.context.requestTaskReconfiguration();

该框架将立即请求新配置并更新任务,在重新加载配置之前优雅的提交自己的进度。注意,SourceConnector检测目前留给connector实现,如果需要一个额外的线程执行此监控。那么connector必须分配它自己。理想的情况下,监控变更代码将会隔离Connector和任务,不需担心。然而,变更也可能影响任务,最常见的是,当其中一个输入流在输入系统销毁了。例如:如果一张表从数据库中删除。如果任务在Connector之前遇到问题,如果Connector需要poll(轮询)变更,则任务将需要处理随后的错误。这些都是常见的问题。值得庆幸的是,这可以通过简单catch和处理相应的异常。SinkConnectors通常只能处理流的增加,它可以转换输出新的entires(例如,一个新的数据库表)。该框架管理对Kafka的输入的任何变更。例如当输入的topic集变化(由一个正则表达式的订阅)。 SinkTasks等待新的输入流,它可能需要在下游系统创建新的资源。比如数据库中的新表,最棘手的情况是在这种情况产生的冲突(多个SinkTask看到一个新的输入流并同时尝试去创建新的资源)。SinkConnectors,另一方面,一般不需要特殊的代码来处理一组动态的流。

连接配置验证 (Connect Configuration Validation)

Kafka Connect允许你在提交要执行的connector之前来验证connector的配置,并可以提供故障和推荐值的反馈。利用这个优势,connector开发者需要提供一个config()的实现来暴露配置给框架。下面的代码在FileStreamSourceConnector定义配置和暴露给框架。

 private static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");

    public ConfigDef config() {
        return CONFIG_DEF;
    }

ConfigDef类用于指定预期的配置集,对于每个配置,你可以指定name,type,默认值,描述,group信息,group中的顺序,配置值的宽和适于在UI显示的名称。另外,你可以通过重写Validator类来指定的验证逻辑用于单个配置验证。此外,由于配置之间有可能存在依赖关系。例如,配置中的vaild和visibillty的值可能会根据其他的配置的值而变化。为了解决这个问题,ConfigDef允许你指定一个配置依赖,并提供推荐系统的实现来获取valid的值并设置visibillty得到的当前配置值。此外,Connector的validate()方法提供了一个默认验证实现,返回一个列表(返回允许配置的列表,每个配置的配置错误和推荐值)。然后,它不适用配置验证的推荐值。你可以提供一个自定义的配置验证覆盖的默认实现,这可能会使用建议的值。

Working with Schemas

FileStream connector是很好的例子,因为它很简单,但是也有很普通的结构化数据 — 每行只有一个字符串(string),实际connector都需要更复杂的数据格式模式,要创建更复杂的数据,你需要使用Kafka Connect data API。除了原始类型的影响,大多数记录的结构需要2个类:Schema和Struct。API文档有完整参考。这里是一个简单的例子,创建一个Schema和Struct:

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();

Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75)
    .build();

如果你实现一个source connector,你需要决定何时以及如何创建schema。如果可能的话,你应该尽量避免重复计算。例如,如果你的connector保证有一个固定的schema,用静态和使用单例,然而,大部分connector有动态的schema。一个简单的例子,一个数据库connector。甚至只考虑一张表,这个schema不会预定义整个connector(因为它表到表的变化),但它也不会固定为单表生命周期中,因为用户可能会ALTER TABLE(修改表)。connector必须能够检测这些变化并作出反应,Sink connector之所以简单,是因为它们消费数据,不需要创建shema。但是,它们应该同样的去关心验证它们收到的schema的格式是预期的。当schema不匹配 — 通常表示上游的生产者生产无效的数据不能被正确的转换到目标系统 — sink connectors抛出一个异常给系统。

Kafka Connect管理 (Kafka Connect Administration)

Kafka Connect的REST层提供了一组API来管理集群。这包括查看connector的配置和任务的状态,以及改变其当前的行为(例如改变配置和重新启动任务)。

当一个connector第一次被提交到集群,worker重新平衡集群中全部的connector和它们的任务。使每个worker具有大致相同的工作量。当connector递增和减少任务数,或connector配置发生变化时,也使用了同样的重新平衡程序。你可以使用REST API查看connector当前的状态和任务,包括每个分配worker的id。例如,查询一个源文件的状态(使用 GET /connectors/file-source/status)可能会产生如下的输入:

{
  "name": "file-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.1.208:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "192.168.1.209:8083"
    }
  ]
}

Connector和它们的任务发布状态状态更新到共享topic(配置status.storage.topic)集群监控中的所有的worker。因为woker异步消费这个topic,在一个状态改变之前,有一个典型的(短)延迟是可见的(通过状态API)。下列的状态可能是connector或是其任务之一:

  • UNASSIGNED: connector/task 还未分配给worker.
  • RUNNING: connector/task 正在运行.
  • PAUSED: The connector/task has been administratively paused.
  • FAILED: connector/tast故障(通常是抛出一个异常,状态输出报告)。

在多数情况下,connector和任务状态将匹配,尽管他们可能短时间不同(当发生变化或任务故障)。例如,当一个connector刚启动时,connector和其任务转换到运行状态之前,可能有明显的延时。当任务故障(因为Connect不会自动重启故障的任务) 状态还会出现分歧。手动的重启connector/任务时,可以使用上面列出的重启API。注意,如果你尝试去重启任务(这个任务正在rebalance),Connect将会返回一个409(冲突)状态代码。你可以在rebalance完成之后重试,但是没有必要,因为rebalance有效地重新启动集群中的所有connector和任务。

有时可以暂时的停止connector的消息处理。例如,如果远程系统正在维护,最好source connector停止poll,而不是一直报错误的异常日志刷屏。对于这个用例,Connect提供了一个暂停/恢复(pause/resume)的API。虽然source connector暂停了,Connect将停止poll额外的记录。当sink connector被暂停时,Connect将停止向它推送新的消息。暂停状态是持久性的。所以即使你重新启动集群,connector也不会再次启动消费处理,直到任务恢复。注意,connector的任务转换到PAUSED状态时有可能会有延迟。因为它需要在暂停的期间来完成所有处理。另外,失败的任务不会转换到PAUSED状态,直到它们重新启动。

更新于 2021-11-13

SOLO 2年前

大佬你好,我原原本本把kafka down到本地,然后照着大佬的讲解run了几个简单的例子,topic之间发了几个消息,都运行正常。但尝试connector file导入/导出数据,运行命令

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

出现下面错误,找不到FileStreamSourceConnector,我是哪个文件放错目录了吗?劳烦请指点一下,谢谢。

ERROR Failed to create job for config/connect-file-source.properties (org.apache.kafka.connect.cli.ConnectStandalone:107)
ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:117)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches FileStreamSourceConnector

半兽人 -> SOLO 2年前

没有找到SourceConnector实现匹配,类似以下的实现:

extends SourceConnector
cYrus 2年前

https://www.orchome.com/10699
大佬能不能来看看我的问题

半兽人 -> cYrus 2年前

在看了。

cYrus -> 半兽人 2年前

感谢感谢

^O^~ 2年前

我这边从mongo通过Kafka connector 数据迁移到solr,但是 mongo数据库那太服务器宕机了,重启后,数据就不迁移了,这种情况怎么处理呢,大佬

半兽人 -> ^O^~ 2年前

找到你的消费者组,看看同步消费者组的offset是否已经到了最后导致的,如果是,你就要重置offset的位置才行。

^O^~ -> 半兽人 2年前

这个怎么看,麻烦说详细点,多谢🙏

半兽人 -> ^O^~ 2年前

参考:kafka命令

  1. 通过「消费者列表查询」找到对应的消费者组
  2. 描述消费者组,查看lag,到了什么位置。
  3. 重置对应消费者组每个分区的lag。
知觉 2年前

您好,我有一个问题想请教,现在我正在使用Kafka connect做数据迁移,现在在使用sink将kafka中的数据迁移到目标数据库时,怎样转化为自定义的格式存入,例如我需要排除掉某些字段,或者对某些字段做修改

半兽人 -> 知觉 2年前
put(Collection<SinkRecord> records);

该方法,已经拿到消息了,你是可以直接对该消息进行处理的呀。

知觉 -> 半兽人 2年前

我看到官方有一个transforms应该也能够实现,我这儿想无侵入的去做转化,不知道是不是用transforms更合适

半兽人 -> 知觉 2年前

都是为了过滤,哪个简单用哪个吧。

知觉 -> 半兽人 2年前

嗯,我这边已经采用transforms去实现,感谢回复

4年前
Map<string, object=""> sourcePartition = Collections.singletonMap("filename", filename);
Map<string, object=""> sourceOffset = Collections.singletonMap("position", streamOffset);

大佬。有个语法问题,为啥SourceRecord的sourcePartition和sourceOffset是用Map类型,filename和postion这两个key是固定的么

半兽人 -> 4年前

key名是固定的,这种设计方式的好处是底层升级之后,对外API不会变,都是字符串的“key”。

-> 半兽人 4年前

噢噢。谢谢大佬。还有一个疑问
好奇为啥要kafka的SourceRecord的sourcePartition和sourceOffset参数为啥要用Map类型。。。一直没搞懂这里

半兽人 -> 4年前

map意味着可以配置多个源呀。

皓君同學 4年前

比如streamValid(stream),inputsChanged() 这些方法是你写的伪代码还是真是方法?想看下这段的代码,可以嘛

皓君同學 4年前

这篇文章写的真不错。代码比kafka 自带的示例还优雅。请问可以看下完整源码嘛

爱歹老干妈 5年前

多个task之间会有并发冲突吗,需要考虑线程安全的问题吗

八荒 -> 爱歹老干妈 5年前

task是平分消息,不会有冲突额

薛定谔的鸽 5年前

"最后,是实现真正核心的getTaskConfigs()。在这种情况下,我们只处理一个文件",真就机翻呗,In this case应该翻译成“在本例中”,而不是“在这种情况下”

Leo 6年前

请问博主,要自定义开发sourceConnector和sinkConnector需要maven依赖?

Leo -> Leo 6年前

写错了,是需要什么依赖?

半兽人 -> Leo 6年前
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
Leo -> 半兽人 6年前

谢谢,已经找到了。client没有source和sink

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>0.10.1.1</version>
 </dependency>
查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章