基于librdkafka的c++消费者封装

剑枫寒 发表于: 2019-08-06   最后更新时间: 2019-08-06  

有两个问题:

1. 本例中使用的是高级消费者api : KafkaConsumer。源码rdkafkacpp.h里面说调用consume消费消息时,会自动调用reblance事件回调,为什么我设置reblance消息回调没有被调用到:

  /**
   class RD_EXPORT KafkaConsumer : public virtual Handle {
   ...
   * @brief Consume message or get error event, triggers callbacks.
   * Will automatically call registered callbacks for any such queued events,
   * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
   * etc.
   * ....
   virtual Message *consume (int timeout_ms) = 0;
   ...
   }
   */


注:继承RdKafka::RebalanceCb就可以实现回调

namespace EDU_SP_KAFKA {
class KafkaRebalanceCb : public RdKafka::RebalanceCb {
public:
        KafkaRebalanceCb() = default;
        ~KafkaRebalanceCb() = default;
        virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
                                  RdKafka::ErrorCode err,
                                  std::vector<RdKafka::TopicPartition*> &partitions) override;
}

int KafkaConsumeImp::initConsumer() {
    std::string errstr;
    // create KafkaRebalanceCb & KafkaEventCb callback object
    kafka_event_cb_ = std::make_shared<EDU_SP_KAFKA::KafkaEventCb>();
    kafka_reblance_cb_ = std::shared_ptr<EDU_SP_KAFKA::KafkaRebalanceCb>();

    //TODO:好像设置的回调没起作用
    kafka_conf_->set("event_cb", kafka_event_cb_.get(), errstr);
    kafka_conf_->set("rebalance_cb", kafka_reblance_cb_.get(), errstr)
    ...};

2. 我是基于librdkafka c++库的基础上再封装了一层sdk提供给上层业务来使用 ,开单独的线程去消费消息,并且模拟了一下业务demo调用,消息消费正常。想问一下进程启动之后各个线程是做什么的,线程启动如下:

PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND 
42545 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 kafka-consumer-
42546 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.01 rdk:broker-1
42547 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.78 rdk:main
42548 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 rdk:broker-1
42549 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.30 rdk:broker1
42550 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.28 rdk:broker2
42551 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.61 rdk:broker3
42552 lvfuchao  20   0  473888   4496   4136 S  0.0  0.0   0:00.00 kafka-consumer-

我的理解是:
(1) broke1、broker2、broker3:是分别用来和三个kafka集群进行交互的
(2)42545(kafka-consumer-):是消息事件回调:RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
(3)42552(kafka-consumer-):是开的单个线程去消费消息
(4) 还有两个broker-1不知道做什么用的

线程消费消息代码如下:

bool KafkaConsumeImp::start_consumer() {
    // begin consume thread
    thread_ = std::make_unique<std::thread>([this] {
        this->start();
    });}

bool KafkaConsumeImp::start() {
    /*Consume messages*/
    if(started_) {
        return -1;
    }
    if(!kafka_consumer_) {
        return -2;
    }
    try {
        while (EDU_SP_KAFKA::run) {
            RdKafka::Message *msg = kafka_consumer_->consume(6000);
            consumer_cb(msg, nullptr);
    } catch (std::exception &e) {
        std::cout << "Execption: consuemer_ start failure !!!" << std::endl;
        return -4;
    }
    started_ = true;
    return true;
}


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容





发表于: 17天前   最后更新时间: 17天前   游览量:163
上一条: kafka集群某topic无消息报错handling request
下一条: kafka生成消息超时报错,各位大神帮帮分析,谢谢

  • 您好,大神可以可以解答一下我的疑问,谢谢!!

    • 我不会c额,所以不敢回答你额。我凭着粗浅的经验,给你分析一下吧,我在群里帮你吆喝一下看有c语言的高手在不。 reblance这个回调通知,像是kafka的topic选举leader的时候,或者有新的消费者加入,才会通知你,而不是消息的通知。
        • ./kafka-consumer-sdk kafkasdk_set_msg_callback success :1 % Created consumer name is:rdkafka#consumer-1 main thread is running !!! RebalanceCb: Local: Assign partitions: test[0], test[1], test1[0], test2[0], test2[1] 对的 ,消费者新增就会调用,但是问题1的方式把rebalance_cb设置为智能指针的方式不会触发回调,我后面又试了一下,把这两个回调重新封装成两个单例,然后就可以触发回调,代码如下: class KafkaRebalanceCb : public RdKafka::RebalanceCb, public Singleton { public: KafkaRebalanceCb() = default; ~KafkaRebalanceCb() = default; virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector &partitions) override; }; //设置回调 kafka_conf_->set("event_cb", KafkaEventCb::Instance(), errstr); kafka_conf_->set("rebalance_cb", KafkaRebalanceCb::Instance(), errstr);