SpringBoot整合Kafka(开启SSL状态报错不开启SSL可以正常使用) 项目启动报错Java heap space

提问说明

SpringBoot项目整合了Kafka,Kafka Broker都开启了SSL(没有开启验证主机名),并且在Linux下用producer以及consumer的命令行可以在SSL状态下使用,但是在SpringBoot中启动就会报错.导致无法启动.证书也都有

 spring:
  kafka:
    ssl:
      key-store-location: file:C:/Users/eatheryu/Desktop/ca/server/server.keystore.jks
      key-store-password: 1111111
      key-password: 1111111
      trust-store-location: file:C:/Users/eatheryu/Desktop/ca/client/client.truststore.jks
      trust-store-password: 1111111
      key-store-type: JKS

    bootstrap-servers: 111.111.111.111:8089
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 30
      ssl:
        protocol: SSL

    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
      group-id: test3
      auto-offset-reset: latest
      max-poll-records: 20
      ssl:
        protocol: SSL

报错信息如下:


Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-11 16:25:41.095 ERROR 18668 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.OutOfMemoryError: Java heap space
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at com.example.kafka.KafkaApplication.main(KafkaApplication.java:10) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.0.RELEASE.jar:2.1.0.RELEASE]
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[na:1.8.0_191]
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_191]
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:425) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:274) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1774) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1742) ~[kafka-clients-2.0.0.jar:na]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:275) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:135) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:257) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:289) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:238) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]

纠结很久了但还不知道错误在哪里






发表于: 2月前   最后更新时间: 1月前   游览量:268
上一条: 到头了!
下一条: 已经是最后了!

  • 看的人不少就是没人回答.....目前问题还未解决..一点思路没有
    • jvm初始值我从128调到了512...还是报错....有一点.我把连接broker的ip地址换成hosts配置过的服务器名称,然后开启ssl通过服务器名称验证,这样子报的是Timeout expired while fetching topic metadata,好像都提示Broker may not be available. 都连不上... 用ip是能连上.但是会报jvm堆溢出....烦
        • Producer代码
          @RequestMapping("/send")
          public String sendMessage(){
          String msg="producer";
          ListenableFuture test = kafkaTemplate.send(topic,msg);
          test.addCallback(new ListenableFutureCallback() {
          @Override
          public void onFailure(Throwable throwable) {
          System.out.println("发送失败");
          }

          @Override
          public void onSuccess(Object o) {
          System.out.println("发送成功");
          }
          });
          return "success";
          }

          Consumer代码
          @KafkaListener(topics = "${pro.topic2}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "consumerAwareErrorHandler")
          public void listen2(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) {
          System.out.println("testInQT====key======" + consumerRecord.key() + "=====Value===" + consumerRecord.value() + "=====offset=====" + consumerRecord.offset() + "consumerRecord.partition();" + consumerRecord.partition());
          acknowledgment.acknowledge();
          }