基于librdkafka的c++消费者封装

剑枫寒 发表于: 2019-08-06   最后更新时间: 2019-08-06 23:30:18   4,499 游览

有两个问题:

1. 本例中使用的是高级消费者api : KafkaConsumer。源码rdkafkacpp.h里面说调用consume消费消息时,会自动调用reblance事件回调,为什么我设置reblance消息回调没有被调用到:

  /**
   class RD_EXPORT KafkaConsumer : public virtual Handle {
   ...
   * @brief Consume message or get error event, triggers callbacks.
   * Will automatically call registered callbacks for any such queued events,
   * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
   * etc.
   * ....
   virtual Message *consume (int timeout_ms) = 0;
   ...
   }
   */


注:继承RdKafka::RebalanceCb就可以实现回调

namespace EDU_SP_KAFKA {
class KafkaRebalanceCb : public RdKafka::RebalanceCb {
public:
        KafkaRebalanceCb() = default;
        ~KafkaRebalanceCb() = default;
        virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                                  RdKafka::ErrorCode err,
                                  std::vector<RdKafka::TopicPartition*> &partitions) override;
}

int KafkaConsumeImp::initConsumer() {
    std::string errstr;
    // create KafkaRebalanceCb & KafkaEventCb callback object
    kafka_event_cb_ = std::make_shared<EDU_SP_KAFKA::KafkaEventCb>();
    kafka_reblance_cb_ = std::shared_ptr<EDU_SP_KAFKA::KafkaRebalanceCb>();

    //TODO:好像设置的回调没起作用
    kafka_conf_->set("event_cb", kafka_event_cb_.get(), errstr);
    kafka_conf_->set("rebalance_cb", kafka_reblance_cb_.get(), errstr)
    ...};

2. 我是基于librdkafka c++库的基础上再封装了一层sdk提供给上层业务来使用 ,开单独的线程去消费消息,并且模拟了一下业务demo调用,消息消费正常。想问一下进程启动之后各个线程是做什么的,线程启动如下:

PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND 
42545 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 kafka-consumer-
42546 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.01 rdk:broker-1
42547 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.78 rdk:main
42548 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 rdk:broker-1
42549 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.30 rdk:broker1
42550 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.28 rdk:broker2
42551 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.61 rdk:broker3
42552 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 kafka-consumer-

我的理解是:
(1) broke1、broker2、broker3:是分别用来和三个kafka集群进行交互的
(2)42545(kafka-consumer-):是消息事件回调:RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
(3)42552(kafka-consumer-):是开的单个线程去消费消息
(4) 还有两个broker-1不知道做什么用的

线程消费消息代码如下:

bool KafkaConsumeImp::start_consumer() {
    // begin consume thread
    thread_ = std::make_unique<std::thread>([this] {
        this->start();
    });}

bool KafkaConsumeImp::start() {
    /*Consume messages*/
    if(started_) {
        return -1;
    }
    if(!kafka_consumer_) {
        return -2;
    }
    try {
        while (EDU_SP_KAFKA::run) {
            RdKafka::Message *msg = kafka_consumer_->consume(6000);
            consumer_cb(msg, nullptr);
    } catch (std::exception &e) {
        std::cout << "Execption: consuemer_ start failure !!!" << std::endl;
        return -4;
    }
    started_ = true;
    return true;
}
发表于 2019-08-06
添加评论

1、本例中使用的是高级消费者api : KafkaConsumer。源码rdkafkacpp.h里面说调用consume消费消息时,会自动调用reblance事件回调,为什么我设置reblance消息回调没有被调用到:

  • 因为你用的HighLevelApi,所以reblance只有在被消费的topic的多个partition进行重新分配时才会调用,比如消费第1个partition的消息者断开后才会触发这个回调。

2、线程用途不太清楚,我觉得主要有两种:

  1. 心跳线程:与broker之间的心跳消息、断链、重新均衡的通知;
  2. 消费线程:取决于你的消费集群还有你的topic分区数,一个分区同时只能被一个消费者线程消费,所以如果分区设置为32个且消费程序只有1个,就会有32个线程;

其他就不了解了。。

嗯嗯 多谢回答

第一: kafka rebalance触发条件:
  1. 消费组成员变更: consumer加入或者退出
  2. topic创建,组订阅关系变更
  3. partition分区改变(修改分区或者broker变更)
第二:kafka线程问题:
  1. 心跳线程:与broker保持心跳
  2. 消费线程:从leader拉消息
  3. 生产者线程:在创建Producer实例时会创建并启动Sender线程实例。只有Sender线程开始运行时(即生产者本地队列有消息发到broker)才会创建与broker的TCP连接
  4. 至于上图中的两个broker-1的线程: 我认为是专门用来拉取集群metadata的线程

您好,大神可以可以解答一下我的疑问,谢谢!!

半兽人 -> 剑枫寒 4年前

我不会c额,所以不敢回答你额。我凭着粗浅的经验,给你分析一下吧,我在群里帮你吆喝一下看有c语言的高手在不。

reblance这个回调通知,像是kafka的topic选举leader的时候,或者有新的消费者加入,才会通知你,而不是消息的通知。

剑枫寒 -> 半兽人 4年前
./kafka-consumer-sdk
kafkasdk_set_msg_callback success :1
% Created consumer name is:rdkafka#consumer-1

main thread is running !!!
RebalanceCb: Local: Assign partitions: test[0], test[1], test1[0], test2[0], test2[1]

对的 ,消费者新增就会调用,但是问题1的方式把rebalance_cb设置为智能指针的方式不会触发回调,我后面又试了一下,把这两个回调重新封装成两个单例,然后就可以触发回调,代码如下:

class KafkaRebalanceCb : public RdKafka::RebalanceCb,
public Singleton {

public:
KafkaRebalanceCb() = default;
~KafkaRebalanceCb() = default;

    virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                              RdKafka::ErrorCode err,
                              std::vector<RdKafka::TopicPartition*> &partitions) override;
};

//设置回调
kafkaconf->set("eventcb", KafkaEventCb::Instance(), errstr);
kafka_conf->set("rebalance_cb", KafkaRebalanceCb::Instance(), errstr);
半兽人 -> 剑枫寒 4年前

以后有c的问题,可以找你了!

剑枫寒 -> 半兽人 4年前

您好,大佬 这两个问题有没有答案哦

你的答案

查看kafka相关的其他问题或提一个您自己的问题