在数据库插入数据后,kafka的consumer不能获得插入信息

U MAKE ME 发表于: 2024-05-09   最后更新时间: 2024-05-09 14:08:03   93 游览

我在128上
第一步:安装了MySQL,然后创建了一张表为user
第二步:执行了

 curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.245.128:8085/connectors' \
--data \
'{"name":"songshu-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://192.168.245.128:3306/kafka?user=root&password=Yhf_1018",
"table.whitelist":"user",
"incrementing.column.name": "id",
"mode":"incrementing",
"topic.prefix": "songshu-mysql-"}}'

第三步:向user插入数据
第四步:执行 bin/kafka-console-consumer.sh --bootstrap-server 192.168.245.128:9092 --topic songshu-mysql-user --from-beginning
但是却没有显示刚刚插入的数据,这是那边出错了呢?(master的connect.log)

[2024-05-09 11:11:58,609] WARN [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Received unknown topic or partition error in fetch for partition songshu-mysql-user-0 (org.apache.kafka.clients.consumer.internals.Fetcher:1246)
[2024-05-09 11:11:58,619] WARN [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Error while fetching metadata with correlation id 10483 : {songshu-mysql-user=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1063)
[2024-05-09 11:11:58,621] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Revoke previously assigned partitions songshu-mysql-user-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:286)
[2024-05-09 11:11:58,623] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
[2024-05-09 11:11:58,736] WARN [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Error while fetching metadata with correlation id 10485 : {songshu-mysql-user=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1063)
[2024-05-09 11:11:58,736] WARN [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] The following subscribed topics are not assigned to any members: [songshu-mysql-user]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:570)
[2024-05-09 11:11:58,736] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Finished assignment for group at generation 3: {connector-consumer-songshu-download-mysql-0-7cf1041e-7f20-4698-be82-4e11750d5cee=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@4d9b6847} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:585)
[2024-05-09 11:11:58,742] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:484)
[2024-05-09 11:11:58,742] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:267)
[2024-05-09 11:11:58,838] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Revoke previously assigned partitions  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:286)
[2024-05-09 11:11:58,838] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
[2024-05-09 11:11:58,840] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Finished assignment for group at generation 4: {connector-consumer-songshu-download-mysql-0-7cf1041e-7f20-4698-be82-4e11750d5cee=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@4892c475} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:585)
[2024-05-09 11:11:58,841] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Successfully joined group with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:484)
[2024-05-09 11:11:58,842] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Adding newly assigned partitions: songshu-mysql-user-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:267)
[2024-05-09 11:11:58,847] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Found no committed offset for partition songshu-mysql-user-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1241)
[2024-05-09 11:11:59,682] INFO [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Resetting offset for partition songshu-mysql-user-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:385)
[2024-05-09 11:11:59,843] WARN [Consumer clientId=connector-consumer-songshu-download-mysql-0, groupId=connect-songshu-download-mysql] Received unknown topic or partition error in fetch for partition songshu-mysql-user-0 (org.apache.kafka.clients.consumer.internals.Fetcher:1246)
[2024-05-09 11:12:39,984] INFO AbstractConfig values: 
 (org.apache.kafka.common.config.AbstractConfig:347)
[2024-05-09 11:13:42,142] INFO AbstractConfig values: 
 (org.apache.kafka.common.config.AbstractConfig:347)
[2024-05-09 11:20:10,070] INFO AbstractConfig values: 
 (org.apache.kafka.common.config.AbstractConfig:347)

1、我在三台机器(master,slave1,slave2)分别启动后,发现slave1和slave2的_consumer_offset-xxx并不是很全,master是0-49,但是slave1其中丢失了10个,slave2只有0-25,所以在启动后的(slave1)日志会一直出现

25 partitions have leader brokers without a matching listener, including [__consumer_offsets-9, __consumer_offsets-39, __consumer_offsets-11, __consumer_offsets-31, __consumer_offsets-13, __consumer_offsets-43, __consumer_offsets-29, __consumer_offsets-1, __consumer_offsets-41, __consumer_offsets-27]

2、还有一个是consumer.properties的auto.offset.reset没有设置不知道是否会有影响?

添加评论
你的答案

查看kafka相关的其他问题或提一个您自己的问题