kafka生产者等待5分钟发送消息服务器关闭

陈阿伦 发表于: 2022-04-28   最后更新时间: 2022-04-28 15:32:38   1,386 游览

1、kafka生产者等待5分钟发送消息,远程主机会关闭连接
2、kafka版本3.1.0,部署环境用的阿里云服务器

 public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip地址");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-DD-mm dd:HH:ss");

        long current = System.currentTimeMillis();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 3; i++) {
            producer.send(new ProducerRecord<String, String>("alg-task-results", "0416", sdf.format(new Date()))).get(30, TimeUnit.SECONDS);
            logger.info("发送第" + i + "个消息,花费" + (System.currentTimeMillis() - current) + "ms");
            current = System.currentTimeMillis();
        }

        for (int i = 0; i < 10; i++) {
            logger.info("等待第" + i + "次睡眠30s");
            Thread.sleep(30 * 1000L);
        }
        current = System.currentTimeMillis();
        logger.info("开始第二次发送");
        for (int i = 0; i < 10; i++) {
            long start = 0;
            if(i == 0){
                start = System.currentTimeMillis();
            }
            // 如果时间设置10秒,会报错超时,设置30秒会有报错,远程主机关闭
            producer.send(new ProducerRecord<String, String>("alg-task-results", "0416", sdf.format(new Date()))).get(10, TimeUnit.SECONDS);
            logger.info("发送第" + i + "个消息,花费" + (System.currentTimeMillis() - current) + "ms");
            current = System.currentTimeMillis();
            long end = 0;
            if(i == 0){
                end = System.currentTimeMillis();
                System.err.println(end-start);
            }

        }

        producer.close();
    }

4、如果等待5分钟,再次发送消息,报错如下:

14:24:15.372 kafkaTest [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=6) and timeout 30000 to node 0: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='alg-task-results')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
14:24:15.593 kafkaTest [main] INFO  Main - 开始第二次发送
14:24:15.594 kafkaTest [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7) and timeout 30000 to node 0: {acks=-1,timeout=30000,partitionSizes=[alg-task-results-2=92]}
Exception in thread "main" java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
    at Main.main(Main.java:53)

5、如果等待5分钟,修改超时时间为30秒,报错如下:

java.io.IOException: 远程主机强迫关闭了一个现有的连接。
    at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
    at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245)
    at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
    at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:95)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
    at java.base/java.lang.Thread.run(Thread.java:834)
发表于 2022-04-28
添加评论

怀疑是阿里云的防火墙,主动断开的连接。
可以看看kafka的日志,看看断开的相关信息。

max.poll.interval.ms:单次poll()操作后可以执行的最长时间,或者poll()调用之间的最大延迟,如果在这个延迟时间之内未收到下一次poll()操作,将认为客户端已失败,从而触发Rebalance操作,默认时间为5分钟。

你的答案

查看kafka相关的其他问题或提一个您自己的问题