为什么kafka消费消息不完整?

無名 发表于: 2016-07-01   最后更新时间: 2016-07-01 22:40:27   4,321 游览

为什么kafka消费消息不完整?


我在代码里produce,在Shell里consumer,为什么consumer不完整?只到767,不到999?

代码如下:


package com.lzm.demo.demo_kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 
 * @author       
 * @date 2016  1  11       4:18:56
 */

public class kafkaConsumer extends Thread {

	private String[] topics;

	public kafkaConsumer(String[] topics) {
		super();
		this.topics = topics;
	}

	@Override
	public void run() {

		Consumer consumer = createConsumer();

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset());
			}
		}
	}

	private Consumer createConsumer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.14.237:9091,192.168.14.237:9092,192.168.14.237:9093");
		props.put("group.id", "d");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("test", "foo"));
		return consumer;
	}

	public static void main(String[] args) {
		new kafkaConsumer(new String[] { "testt", "bar" }).start();
	}

}

就是拿不到消息

发表于 2016-07-01
添加评论

在程序main方法中,增加休眠, 


Thread.sleep(2000); 


 因为kafka发送的消息,还在缓存中,还没发送。
而主进程跑完结束了,导致线程终结,消息丢失。

楼主集群中kafka的版本是多少?我的代码跟你一样,但是老报错

你的答案

查看kafka相关的其他问题或提一个您自己的问题