kafka KTable漏消息的问题?

KaL 发表于: 2021-03-16   最后更新时间: 2021-03-16 17:41:33   84 游览
0
StreamsBuilder sb = new StreamsBuilder();
KTable<Integer, Integer> example =    
    sb.stream(inputs, Consumed.with(Serdes.Integer(), Serdes.Integer()))    
        .groupByKey()    
        .aggregate(        
            () -> 0,        
            (key, value, currentValue) -> {          
                logger.info("During Aggregation {}", currentValue)
                return currentValue
            },        
            Materialized.with(Serdes.Integer(), Serdes.Integer())    
        );

example.toStream().peek(loggger.info("At the end {}", currentValue))

我在0.0001 秒里放出了这三个event

(1, 1), (1, 2), (1, 3)

log到了

"During Aggregation 1" 
"During Aggregation 2" 
"During Aggregation 3"

但我只有"At the end 3"
漏掉了"At the end 1", "At the end 2"

是kafka KTable有什么短时间内不update的情况吗,有什么方法能够把"At the end 1", "At the end 2" 也打印出来?



发表于 1月前
KaL

  • 找不到想要的答案?

    我要提问
    相关