c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性问题

轻骑踏红尘 发表于: 2022-06-30   最后更新时间: 2022-06-30 18:28:15   806 游览

c++ 调用kafka,rd_kafka_conf_set接口设置"auto.offset.reset"属性时遇到问题,场景如下:

  1. 用一个进程作为生产者,一直在生产发送topic的消息。

  2. 生产者消息没有被消费,当我消费者连接成功时,将会接收所有以往没有被消费过的消息,可我的使用场景是只需要接收当前时间产生的消息,之前的需要忽略掉。

我该设置的是服务器配置还是代码????

求大佬解答!!只需要接收当前时间产生的消息,之前的需要忽略掉,不管它有没有被消费过。

发表于 2022-06-30
添加评论

你想像一下订报纸,在你没有订阅之前,报社根本就不知道你,当你订阅的这一刻,新的报纸才会发给你,老的报纸是不会给你的。这时,你消费者停掉了,发报员依然会为你保留报纸,因为你已经在报社有名单了。这个名单是根据是group.id确认的,如果你想每次都是最新的,可以每次都用新的group.id

另外要设置成auto.offset.reset=latest,解释如下:

  • earliest:自动将偏移重置为最早的偏移
  • latest:自动将偏移重置为最新偏移

但是,这个latest并不代表这一时刻的消息,如果你已经在报社有名单了,那就是基于报社记录你最后一次拿取报纸时间,把这些‘报纸’都发给你。

大佬的回答很生动形象,瞬间理解了很多,我试了一下,现场和您说的是一样的。

但现在需要的是,完全不需要老的报纸,每次用新的group.id感觉不太好,是否有接口能在报社名单上消除这个id,我下次再用,它就认为是新的id。将不会给我老的报纸,或者配置中有没有设置,告诉报社,即使名单上有了这个group.id,也不用给他旧的 报纸,它来了直接给最新的。

是可以删除group.id,可参考:

另外一种方式是重置offset,分2步:

  1. 通过时间来获取到offset
  2. 消费者重置offset

c++的我没用过,java的关键的2个命令如下,可以帮助你找找对应c++的代码:

  • consumer.offsetsForTimes // 通过时间,定位offset
  • consumer.seek // 重新定位offset

找是找到了,可是不管怎么设置都没生效,难受呀,不知道问题在哪里,还需要设置其它什么,我的想法是在订阅前把这个设置完成,代码如下:

bool MessageQueueConsumer::subscribeAction(){
    rd_kafka_resp_err_t err;
    rd_kafka_topic_partition_list_t *subscription; 

    subscription = rd_kafka_topic_partition_list_new(_topocList.size());
   pthread_mutex_lock(&_mutex);
   for(int count =  0; count <  _topocList.size(); count ++){
        rd_kafka_topic_partition_list_add(subscription,_topocList[count].c_str(),
        count);
   }
   pthread_mutex_unlock(&_mutex);
   for(int count =  0; count <  subscription->cnt; count ++){
        subscription->elems[count].offset = RD_KAFKA_OFFSET_END;
   }
   rd_kafka_seek_partitions(_handler,subscription, 1000);
   err = rd_kafka_subscribe(_handler, subscription);
   if (err) {
        //错误日志记录 todo
        rd_kafka_topic_partition_list_destroy(subscription);
        return false;
    }
    rd_kafka_topic_partition_list_destroy(subscription);
    return true;
}

没看到你通过时间获取offset的逻辑。

我看源码中例子是设置了offset = RD_KAFKA_OFFSET_BEGINNING就可以了,可是我测试的时候不太行,这是源码中对RD_KAFKA_OFFSET_END字段的注解:

#define RD_KAFKA_OFFSET_BEGINNING                                              \
        -2 /**< Start consuming from beginning of                              \
            *   kafka partition queue: oldest msg */
#define RD_KAFKA_OFFSET_END                                                    \
        -1 /**< Start consuming from end of kafka                              \
            *   partition queue: next msg */
#define RD_KAFKA_OFFSET_STORED                                                 \
        -1000 /**< Start consuming from offset retrieved                       \
               *   from offset store */
#define RD_KAFKA_OFFSET_INVALID -1001 /**< Invalid offset */

我觉得你还是得通过时间获取offset的位置,因为客户端是无法得知当前offset的位置。

你的答案

查看kafka相关的其他问题或提一个您自己的问题