kafka接口设计

原创
半兽人 发表于: 2015-03-10   最后更新时间: 2016-10-27 10:44:29  
{{totalSubscript}} 订阅, 38,933 游览

5.1 API设计

生产者API

The Producer API that wraps the 2 low-level producers - kafka.producer.SyncProducer and kafka.producer.async.AsyncProducer.

生产者API,它封装了2个低级别的生产者 - kafka.producer.SyncProducer  和  kafka.producer.async.AsyncProducer

class Producer {
  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);

  /* Closes the producer and cleans up */
  public void close();

}
The goal is to expose all the producer functionality through a single API to the client. The new producer -
通过API提供给客户端,来暴露生产者所有的功能。新的生产者 -
  • can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data -
    可以处理多个生产者请求和异步批量数据派发的队列/缓冲 -

    kafka.producer.Producer provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either queue.time or batch.sizeis reached. A background thread (kafka.producer.async.ProducerSendThread) dequeues the batch of data and lets the kafka.producer.EventHandler serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the kafka.producer.async.CallbackHandler interface and setting callback.handler config parameter to that class.
    kafka.producer.Producer 提供批处理多个生产请求的能力(producer.type=async),序列化和派发到broker分区之前,可以配置参数控制批量的大小。随着事件进入队列,缓存在队列,直到满足 queue.time 或 batch.size。后台线程(kafka.producer.async.ProducerSendThread)发送一批数据用kafka.producer.EventHandler序列化并发送到broker分区。还可以通过event.handler配置参数可以插入自定义的事件处理。生产者队列管道在不同的阶段,无论是插入自动的日志记录/跟踪代码或自定义的监控逻辑,能够注入回调,通过实现kafka.producer.async.CallbackHandler接口和设置callback.handler配置参数的类。

  • handles the serialization of data through a user-specified Encoder-
    处理数据序列化,通过用户指定的 Encoder-
    interface Encoder<T> {
      public Message toMessage(T data);
    }

    The default is the no-op kafka.serializer.DefaultEncoder
    默认是空操作kafka.serializer.DefaultEncoder

  • provides software load balancing through an optionally user-specified Partitioner-
    提供平衡负载,通过用户指定的Partitioner-

    The routing decision is influenced by the kafka.producer.Partitioner.
    路由决定由kafka.producer.Partitioner影响。

    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }


    The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy ishash(key)%numPartitions. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using thepartitioner.classconfig parameter.
    该分区API,使用key和可用broker分区数,返回一个分区ID。这个id用作索引broker_ids和分区排序列表来为生产者请求挑选一个broker分区。默认的分区策略是hash(key)% numPartitions。如果key是空,则随机broker分区,还可以插入自定义分区策略使用partitioner.class配置参数。

消费者API

We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.
我们有两个层次的消费者API,低级别的“简单”API保持一个broker的连接,并有密切通讯网络发送到服务器的请求。这个API完全无状态的,偏移量被传递到每个请求,允许用户保持这个元数据。


The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).
高级别API封装broker消费具体细节。它维持已消费的状态,高级别API还提供了通过匹配表达式(或过滤)的方式获取指定topic(即,无论是白名单或黑名单的正则表达式)。


低级别API
class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}


The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.
低级别的API用于实现高级别的API,以及直接用于一些有特定需求的离线的消费者。


高级别API
/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}

This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.

这个API是围绕中心的迭代器,KafkaStream类实现的,每个KafkaStream代表一个或多个服务器上的多个分区的消息流,每个流都是单个线程处理,因此客户可以提供所有要的数量创建调用流,因此,流可能代表多个服务器分区的合并(对应处理线程的数量),但每个分区只能进入一个流。


The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).

createMessageStreams调用注册消费者的topic,这将导致重新平衡消费者/broker分配。API鼓励创建单个调用多个的topic流,以尽量减少这种再平衡。createMessageStreamsByFilter调用(另外)登记观察者发现符合过滤器的新topic。需要注意的是,每个流的createMessageStreamsByFilter返回的消息可能遍历了多个topic(既,如果是过滤器允许多个topic)。


更新于 2016-10-27

大神 java编写的kafka代码 是否可以浏览器直接运行,我尝试用php在浏览器中打开 不成功 报错类找不到 ,在linux终端中是正常的?如果无法直接访问 在代码中改应该怎么执行哪?

说明引用缺少依赖了。

以来是librdkafka吗 还是与php扩展那?为什么在终端直接执行文件ok那

报啥依赖缺失呢?

你都jsp了,干嘛不仅仅让jsp做展示呢。

感谢! java中能否利用得到的api得到每个组中 LAG为空的分区 , 好进行进程的管理?,如果为空了 我就杀掉这个进程,有值了我再开启

java与kafka建立长连接,并通过长轮询的方式主动向kafka拉取消息,没必要反复暂停开启吧?

像我是用php开发,用swoole进行消费者线程的管理,考虑到分区数据为空时,就杀掉进程 节约系统内存

Jun -> 像我这样的人 5年前

最近也在使用swoole结合kafka做一项目,遇到一些问题,想向你请教下,我的qq是1610462603,不知道您是否可以加下,万分感谢!!!

绽放 6年前

大神,kafka_2.11-0.10.1.0支持producer发送自定义的消息类么?我百度到的结果都是说要实现Encoder这个接口,但是在
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>
这个依赖中所相关的jar包中,并没有Encoder这个接口,请问您是怎么处理的呢?

半兽人 -> 绽放 6年前

当然支持自定义消息了。key.serializer,value.serializer

绽放 -> 半兽人 6年前

您好!我疑惑的地方是自定义消息需要实现Encoder这个接口,而在我上面写出的依赖中的jar包,并没有Encoder这个接口,如果需要这个接口的jar包,应该去哪里下载呢?

半兽人 -> 绽放 6年前

是你代码写的有问题。找不到对象哦。

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章