你提供的信息太少了,不知道你是怎么启动的,和你的关键yaml信息。
1、kubectl logs <kafka-pod>
看到的日志是bin/kafka-server-start.sh config/server.properties
的日志。
2、你运行起来kafka之后,可以命令行进入到容器内,查看/opt/bitnami/scripts/kafka-env.sh
,里面有包含日志的信息,关键字是KAFKA_LOG_DIR
。
export KAFKA_LOG_DIR="${KAFKA_BASE_DIR}/logs"
所以,你可以试试加上KAFKA_LOG_DIR
环境变量来指定kafka打印日志的位置。
同样,如果不生效,也可以换成LOG_DIR
试试,这个是kafka的原始日志目录指定。
5M的消息,就不要批处理了:
# 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)
batch.size=0
其次,既然已经生产超时,反复重试了,说明已经到了瓶颈。降低向kafka发送速度,防止阻塞而导致的超时,进而重试,最后丢消息。
兄弟,没有消息,怎么会报接收数量大于呢?你连错kafka集群了吧
先用命令行测试你的集群吧:kafka命令大全
没用过CDH额,这个配置是默认生成的,不过文件内容很简单,只是为了方便你命令形态的东西,写到里面变成固定的,不用每次带那么多参数了,你可以下载一个官方的版本的kafka来获取。
producer.properties的默认内容如下:
cat config/producer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
kafka消费者是批量拉取消息的,比如一次拉取2000条。
说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。
这是 Lettuce 的一项安全功能,目的是防止通过集群节点(CLUSTER NODES)重定向到(尚未)已知的节点。如果拓扑结构发生变化,而客户端有不同的视图,就会出现这种状态。
Redis在扩容/缩容过程中,当实例分片数发生变化时,存在节点拓扑关系和Slot对应信息的变化,需要客户端进行拓扑关系的自动更新,否则可能造成请求路由失败或者路由位置错误等,造成客户端访问报错。
1、开启Cluster集群自动刷新拓扑配置。
spring:
redis:
cluster:
refresh:
adaptive: true
period: 30000 # 30秒自动刷新一次
2、关闭“验证集群节点成员资格开关”,关闭方式如下:
spring:
redis:
cluster:
validate-cluster-node-membership: false
kafka之间是无法互相发现对方的,每个kafka向zk注册,说我是A节点(broker.id),我是B节点,这样组成了一个kafka集群。每个人通过zk来发现彼此。
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
问题解决了。但是原因具体原因不能确定。如果有相同情况,可以按照这个思路排查看看。
我们服务器是arm架构,但是jdk没有使用arm架构的,替换了jdk版本后文件正常删除了。
改了之后:虽然钉钉还会在网络波动的时候报警,因为数据从生产到目标topic大概会用1分钟时间。但是再也没有重复消费情况了。感谢!