多线程消费 KafkaConsumer is not safe for multi-threaded access

√锋²º¹8ヾ/❤ 发表于: 2018-09-05   最后更新时间: 2018-09-05 09:05:17   12,178 游览

一个副本、一个分区生产信息。

消费的时候,开启了3个线程,并发消费同一主题,同一个组。

启动后瞬间就出现:

Exception in thread "Thread-10" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031)
at com.oristartech.kafka.core.consumer.ConsumerHandler$1.run(ConsumerHandler.java:100)
at java.lang.Thread.run(Thread.java:748)

翻看了一下其他问题发问,好像是说消费者是非线程安全的,而且线程 和 分区也有着限定的关系(线程 ≤ 分区)。

如果要满足多线程消费 同主题+同分组,该如何处理才好

发表于 2018-09-05
添加评论

一个消费者对应一个分区,所以你多个消费者,其实永远只有一个消费者拿到消息.
kafka拉取消息是一批一批的,大概一次拉取2000-4000条。你应该在拉取后进行多线程处理.

好的。
但是,poll的时候,测试发现大概每轮只有500条,就停顿0.5-1秒。并不像您说的“大概一次拉取2000-4000条”。
消费者拉取主函数:
public void consume(int threadNumber) {

  if (ConsumerThreadTools.mapListeners.isEmpty()) {
   logger.error("消息处理对象为空,不进行消费.........返回");
   return;
  }

  if(executors == null) {
   
   executors = new ThreadPoolExecutor(threadNumber, threadNumber, 0L, TimeUnit.MILLISECONDS,
     new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
  }else {
   
  }
  
  try {
   while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);
    System.out.println(JSONObject.toJSONString(records));
    if (!records.isEmpty()) {
     executors.submit(new ConsumerThreadWorker(records, offsets));
    }
    commitOffsets();
   }
  } catch (WakeupException e) {
   // swallow this exception
  } finally {
   commitOffsets();
   consumer.close();
  }
 }


拉取的条数跟你消息大小有关,你把poll时间调成100,提交改成自动试试

你的答案

查看kafka相关的其他问题或提一个您自己的问题