kafka [Consumer clientId=consumer-1, groupId=order] Connection with /49.235.220.50 disconnected

回忆、另洊爲 发表于: 2019-12-10   最后更新时间: 2019-12-10  

腾讯云服务器上搭建了单节点kafka,之前还可以本地通过公网ip访问,今天启动consumer时就报错disconnected,在服务器通过命令是可以正常发送消息和消费消息的。

kafka 版本是0.10.1.1,腾讯云是centos7,本地是java客户端

Properties props = new Properties();
//kafka地址
props.put("bootstrap.servers", host);
//超时时间
 props.put("session.timeout.ms", "30000");
//一次最大拉取条数
props.put("max.poll.records", 1000);
//消费规则
props.put("auto.offset.reset", "earliest");
//key序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "order");
//是否自动提交
props.put("enable.auto.commit", false);
consumer = new KafkaConsumer<String, String>(props);

info的日志:

2019-12-10 17:27:33.206 INFO 13368 --- [      Thread-27] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-1, groupId=order] Cluster ID: IUwcE7XnQ_SogaejZ3iD9A
2019-12-10 17:27:33.207  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=order] Discovered group coordinator 49.235.220.50:9092 (id: 2147483647 rack: null)
2019-12-10 17:27:33.209  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=order] Revoking previously assigned partitions []
2019-12-10 17:27:33.209  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=order] (Re-)joining group
2019-12-10 17:27:33.384  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=order] Successfully joined group with generation 5
2019-12-10 17:27:33.388  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=order] Setting newly assigned partitions: hello-0
2019-12-10 17:27:33.429  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=order] Group coordinator 49.235.220.50:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2019-12-10 17:27:33.615  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=order] Discovered group coordinator 49.235.220.50:9092 (id: 2147483647 rack: null)
2019-12-10 17:27:33.748  INFO 13368 --- [      Thread-27] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=order] Group coordinator 49.235.220.50:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery

debug的日志:

o.apache.kafka.common.network.Selector   : [Consumer clientId=consumer-1, groupId=order] Connection with /49.235.220.50 disconnected
java.io.IOException: 远程主机强迫关闭了一个现有的连接。

尝试过其他的公司内网kafka,无问题,可正常使用,思考可能是云服务器问题,本地也可以正常的telnet 访问到公网地址,提交工单给了腾讯云,那边工程师说服务器并无问题。从日志中也可以看到consumer已成功加入到group中,按理说应该服务器也是可正常连接的,不知为何会一直断开连接,还麻烦能指点下。



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka服务节点挂掉 current leader's latest offset 8260512 is less than replica's latest offset 8314390 (kafka.server.ReplicaFetcherThread)
下一条: kafka 没有提交偏移量为什么还消费不到消息

  • 如果按照的描述,今天突然不行了,可参考下:https://www.orchome.com/1903
    看看是否有额外的变动。
    另外,针对错误:java.io.IOException: 远程主机强迫关闭了一个现有的连接。

    解释如下:

    若客户端强制关闭,服务器会报“java.io.IOException: 远程主机强迫关闭了一个现有的连接。”,并且服务器会在报错后停止运行,错误的意思就是客户端关闭了,但是服务器还在从这个套接字通道读取数据,便抛出IOException,导致这种情况出现的原因就是,客户端异常关闭后,服务器的选择器会获取到与客户端套接字对应的套接字通道SelectionKey,并且这个key的兴趣是OP_READ,执行从这个通道读取数据时,客户端已套接字已关闭,所以会出现“java.io.IOException: 远程主机强迫关闭了一个现有的连接”的错误。解决这种问题也很简单,就是服务器在读取数据时,若发生异常,则取消当前key并关闭通道。

    那客户端是否版本变更过,先进行简单的排查。