kafka命令行消费者无法消费?

提问说明

我在windows运行下面的producer,在linux虚拟机上部署了kafka,但是在linux用命令行没法消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.93.129:9092 --topic my-tc4 --from-beginning

producer代码:


package com.cetccq.platform.logserver;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class XTKafkaProducer {
    private static Properties kafkaProps;

    private static void initKafka() {
        kafkaProps = new Properties();
        // broker url
        kafkaProps.put("bootstrap.servers", "192.168.93.129:9092"); 
        // request need to validate
        kafkaProps.put("acks", "all");
        // request failed to try
        kafkaProps.put("retries", 0);
        // memory cache size
        kafkaProps.put("batch.size", 16384);
        //
        kafkaProps.put("linger.ms", 1);
        kafkaProps.put("buffer.memory", 33554432);
        // define the way of key and value serializer
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    }

    public static void main(String[] args) {
        initKafka();
        Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String,String>("my-tc4", Integer.toString(i), Integer.toString(i)));
        }
        System.out.println("Message sent successfully!");
        producer.close();
    }
}

servers.properties:

broker.id=0
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.93.129:9092
port = 9092
num.network.threads=3
num.io.threads=8
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

我的操作步骤是这样的:

  1. 启动zookeeper
  2. 启动kafka broker
  3. 命令行创建topic:my-tc4
  4. 运行producer程序
  5. 命令行启动consumer

麻烦您看看哪里有问题?感激不尽!






发表于: 9天前   最后更新时间: 9天前   游览量:79
上一条: 到头了!
下一条: 已经是最后了!

  • producer.send(new ProducerRecord("my-tc4", Integer.toString(i), Integer.toString(i)));
    这个在后面加个.get(),变成同步,先观察消息是否发送成功,是否有报错,如:
    producer.send(new ProducerRecord("my-tc4", Integer.toString(i), Integer.toString(i))).get();
    • 报错了:
      Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic my-tc4 not present in metadata after 60000 ms.
      at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1255)
      at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:917)
      at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:840)
      at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:727)
      at com.cetccq.platform.logserver.XTKafkaProducer.main(XTKafkaProducer.java:36)
      Caused by: org.apache.kafka.common.errors.TimeoutException: Topic my-tc4 not present in metadata after 60000 ms.

      不太明白metadata是个什么意思T T