Kafka消费时报错org.springframework.kafka.KafkaException: No method found for class java.lang.String

Sheldonliu 发表于: 2019-03-13   最后更新时间: 2019-03-13  

消费者报找不到方法为String类型的

消费端配置

@Profile("kafka")
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    public KafkaConsumerConfig(){
        System.out.println("kafka消费者配置加载...");
    }


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setMessageListener(kafkaConsumerListener());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }

    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props= new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.31:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,  "KafkaConsumerListener");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,  30000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

    @Bean
    public KafkaConsumerListenser kafkaConsumerListener(){
        return new KafkaConsumerListenser();
    }
}

监听类

@KafkaListener(topics = "test")
public class KafkaConsumerListenser implements AcknowledgingMessageListener<Integer, String> {


    @Override
    public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord, Acknowledgment acknowledgment) {
        System.out.println(integerStringConsumerRecord.value()+"-----------------------------------------------------------------"+integerStringConsumerRecord.topic());
    }
}

报错信息:

org.springframework.kafka.KafkaException: No method found for class java.lang.String
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:146)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:166)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:981)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容





发表于: 5月前   最后更新时间: 5月前   游览量:864
上一条: 请教各位大佬一个问题
下一条: kafka 副本和分区改怎么设置,4个kafka结点,topic有4个副本

  • 看这个错,是缺少一个String参数的方法。你把Integer的先去掉。

    • public class KafkaConsumerListenser {
      @KafkaListener(group="KafkaConsumerListener" ,topics = "test")
      void listener(ConsumerRecord consumerRecords, Acknowledgment ack){
      System.out.println("-----------------------------------------------------------------"+consumerRecords.value());
      ack.acknowledge();
      }

      刚才试了下去掉Integer也不行,换成这种方式就可以。继承接口的话就不行