kafka consumer指定位置消费offset的用法

Marseille 发表于: 2016-06-13   最后更新时间: 2021-08-23 23:34:12   20,388 游览

consumer.seek(topicPartition, offset); 有人用过吗?

*****************java.lang.IllegalStateException: No current assignment for partition HighAvailabilityTest-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134)
    at com.masai.kafka.ConsumerOnce.run(ConsumerOnce.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

分区首先肯定是存在的,这个assignment是什么,需要怎么配置还是?求解

发表于 2016-06-13

问题原因:手动指定消费位置,无法自动负载均衡,所以要手动注册,才能消费

consumer.assign(Arrays.asList(topicPartition));
ighack -> Marseille 7年前
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", false);
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", maxPollRecords);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList(topic));

TopicPartition p = new TopicPartition(topic,2);
consumer.assign(Arrays.asList(p));
consumer.seek(p,488430);
//while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         String V = record.value();
     }
// }

没有报错。但还是查不出数据啊

no current assignment for partition,是不是你的分区已经占用了,所以不能分配给你,

你的答案

查看kafka相关的其他问题或提一个您自己的问题