kafka生成消息超时报错,各位大神帮帮分析,谢谢

凤正 发表于: 2019-08-06   最后更新时间: 2019-08-06  

提问说明

java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for ddcar-2: 30025 ms has passed since last attempt plus backoff time
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:122)
    at com.kehua.platfrom.kafka.producer.KafkaProducerServer.checkProRecord(KafkaProducerServer.java:119)
    at com.kehua.platfrom.kafka.producer.KafkaProducerServer.sndMesForTemplate(KafkaProducerServer.java:86)
    at com.kehua.sys.aop.DeviceUpdateAop.realDataUpdate(DeviceUpdateAop.java:173)
    at com.kehua.sys.aop.DeviceUpdateAop.login(DeviceUpdateAop.java:80)
    at sun.reflect.GeneratedMethodAccessor456.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:620)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:602)
    at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:47)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:44)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:44)

    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
    at com.kehua.platfrom.netty.service.CorrespondServiceV2$$EnhancerBySpringCGLIB$$a6e3882a.dealReallyData(<generated>)
    at com.kehua.platfrom.netty.messagefactory.khmessagefactory4_0.khmessage4_0.UploadRealDataMsg.receive(UploadRealDataMsg.java:114)
    at com.kehua.platfrom.netty.messagefactory.khmessagefactory4_0.khmessage4_0.UploadRealDataMsg.analysis(UploadRealDataMsg.java:41)
    at com.kehua.platfrom.netty.protocolfactory.protocol.KehuaProtocol4_0.analysis(KehuaProtocol4_0.java:82)
    at com.kehua.platfrom.netty.handler.ServerHandlerV2.channelRead(ServerHandlerV2.java:91)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
    at io.netty.handler.timeout.ReadTimeoutHandler.channelRead(ReadTimeoutHandler.java:149)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
    at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
    at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
    at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
    at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
    at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for ddcar-2: 30025 ms has passed since last attempt plus backoff time
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:330)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:745)


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: 基于librdkafka的c++消费者封装
下一条: kafka是集群部署的,为什么创建的topic只在一台机器上存在?

  • ddcar-2你在消费者那ping一下,通吗

    • 可以ping通,但是java程序生成消息就会报错,后面发现改变kafka配置文件advertised.listeners=PLAINTEXT://内网IP:9092就好了