赞
踩
作为一个c++程序,我相信很多人都会使用c++的librdkafkacpp的接口,这些接口通过C++封装,使用起来方便,不需要去了解底层的c接口的封装及实现,是最理想的使用方式。
这就涉及到C++的ABI二进制兼容问题。
当我们编译一个C++动态库,受限于不同的编译器版本,无论是gcc还是VC,都存在着兼容问题。对于一个生产版本,如果在统一了编译工具链的情景下,是可以使用C++版本,但如果考虑到更好的ABI兼容,所以就选择了C库。其无论在VC还是gcc,都表现得很好。
librdKafka 提供了三种不同的消费模式,下面将讲解这三种模式的差异及如何使用。
- // 一次读取一条数据
- rd_kafka_consumer_poll();
- rd_kafka_consume();
-
- // 批量读取,一次可以读取多条数据
- // 通过回调函数的批量读取,将废弃
- rd_kafka_consume_callback();
- rd_kafka_consume_batch();
-
- // 通过Queue路由道特定的处理函数
- rd_kafka_consume_callback_queue
- rd_kafka_consume_batch_queue
1、这种模式,直接使用rd_kafka_consumer_poll()接口,其可以实现1次读取一条,这种场景对于数据量不大,可以很好的使用,而且其客户端API也实现了相关的消息确认和提交,大多数采用这种方式,这种方式的例程,我在前面的博文中有讲到,有兴趣的可以去看看
2、这种模式是批量读取的模式,这种模式下有两种接口,一种基于api的回调
rd_kafka_consume_callback,这个接口根据官方文档是即将被废弃的接口,所以不建议使用该接口,建议使用rd_kafka_consume_batch(),后面会给出这种模式下的使用例子。
3、使用Queue的方式,该方式也提供了两种模式,一种回调,一种直接消费,回调的方式应该在性能上是最高的。这种方式与第二种方式的不同是,他能让不同topic,不同patition的数据路由到一个队列,也就是对于一个需要处理多种数据的消费者。
1、消息最好程序中自己做确认提交,这样在kafka的服务,才不会看到滞后的消息消费
2、注意消息的销毁,kafka在销毁对象时会持有引用技术,消费完的消息,要destoy
3、对于topic+partition的消费模式,最好不要在运行热新增分区或主题,否则客户端处理起来比较麻烦。如果必要这么做,要做好测试。保证数据不重复不丢消息。
KafkaConsumer.h
- #ifndef KAFKACONSUMER_H
- #define KAFKACONSUMER_H
-
- #include <string>
- #include <iostream>
- #include <vector>
- #include <stdio.h>
-
- #include "rdkafka.h"
-
- using namespace std;
-
- struct TKafkaCfgInfo
- {
- string m_strBrokers;
- string m_strGroupID;
- string m_strTopics;
- int m_nPartition;
-
- int m_nBatchReadSize;
- int m_nReadTimeout;
- bool m_bIsLogKafka;
-
- TKafkaCfgInfo()
- : m_nPartition(-1)
- , m_nBatchReadSize(-1)
- , m_nReadTimeout(-1)
- , m_bIsLogKafka(false)
- {}
- };
-
- class CKafkaConsumer
- {
- public:
- CKafkaConsumer();
- virtual ~CKafkaConsumer();
-
- // 初?始º?化¡¥Kafka配?置?
- int InitCfg(const TKafkaCfgInfo& tCfg);
- const TKafkaCfgInfo& GetCfg() { return m_tCfg; }
-
- // 拉¤-取¨?Kafka消?息¡é
- void pullMessage(rd_kafka_message_t** p, ssize_t& nCnt);
-
- void SetConsumerOk(rd_kafka_message_t* pMsg);
-
- protected:
- rd_kafka_t* m_pkafka;
- rd_kafka_topic_t* m_pKafkaTopic;
-
- TKafkaCfgInfo m_tCfg;
- };
-
- #endif // KAFKACONSUMER_H
// KafkaConsumer.cpp
- #include "KafkaConsumer.h"
-
- #include <iostream>
-
- using namespace std;
-
- static void RebalanceCb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque)
- {
- 测a试º?代䨲码?
- #if 0
- if (partitions)
- {
- for (int i = 0; i < partitions->cnt; i++)
- {
- partitions->elems[i].offset = 0;
- }
- }
- printf("RebalanceCb \r\n");
- #endif
- printf("RebalanceCb \n");
- rd_kafka_assign(rk, partitions);
- }
-
- static void EventErrorCb(rd_kafka_t* rk, int err,
- const char* reason,
- void* opaque)
- {
- printf("Kafka EventErrorCb(%d): %s\n", err, reason);
- }
-
- CKafkaConsumer::CKafkaConsumer()
- : m_pkafka(nullptr)
- , m_pKafkaTopic(nullptr)
- {
- }
-
- int CKafkaConsumer::InitCfg(const TKafkaCfgInfo& tCfg)
- {
- m_tCfg = tCfg;
-
- rd_kafka_conf_t* pConf = rd_kafka_conf_new();
- if (!pConf)
- {
-
- return -1;
- }
-
- char szErr[512] = { 0 };
- if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_tCfg.m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
- {
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
-
- if (rd_kafka_conf_set(pConf, "group.id", m_tCfg.m_strGroupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
- {
-
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
-
- // rd_kafka_conf_set_rebalance_cb(pConf, &RebalanceCb);
- rd_kafka_conf_set_error_cb(pConf, EventErrorCb);
-
- if (rd_kafka_conf_set(pConf, "enable.auto.commit", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
- {
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
-
- if (rd_kafka_conf_set(pConf, "enable.auto.offset.store", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
- {
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
-
- // topic配?置?
- /*
- rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
- /*if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "latest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
- {
- rd_kafka_topic_conf_destroy(pTopicConf);
- return -1;
- }*/
-
- // 创ä¡ä建¡§kafka实º¦Ì例¤y
- m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
- if (!m_pkafka)
- {
- return -1;
- }
-
- m_pKafkaTopic = rd_kafka_topic_new(m_pkafka, m_tCfg.m_strTopics.c_str(), nullptr);
- if (!m_pKafkaTopic)
- {
- return -1;
- }
-
- rd_kafka_consume_start(m_pKafkaTopic, m_tCfg.m_nPartition, RD_KAFKA_OFFSET_STORED);
-
- return 0;
- }
-
- void CKafkaConsumer::pullMessage(rd_kafka_message_t** pRet, ssize_t& nCnt)
- {
- nCnt = rd_kafka_consume_batch(m_pKafkaTopic, m_tCfg.m_nPartition, m_tCfg.m_nReadTimeout, pRet, m_tCfg.m_nBatchReadSize);
- // rd_kafka_poll(m_pkafka, 0);
- }
-
- void CKafkaConsumer::SetConsumerOk(rd_kafka_message_t* pMsg)
- {
- rd_kafka_resp_err_t errCode = rd_kafka_offset_store(m_pKafkaTopic, m_tCfg.m_nPartition, pMsg->offset);
- printf("rd_kafka_offset_store offset = %d\n", pMsg->offset);
- if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
- {
- return;
- }
-
- errCode = rd_kafka_commit_message(m_pkafka, pMsg, 0);
- if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
- {
- return;
- }
- }
-
- CKafkaConsumer::~CKafkaConsumer()
- {
- if (m_pKafkaTopic)
- {
- rd_kafka_consume_stop(m_pKafkaTopic, m_tCfg.m_nPartition);
- rd_kafka_topic_destroy(m_pKafkaTopic);
- m_pKafkaTopic = nullptr;
- }
-
- if (m_pkafka)
- {
- // rd_kafka_consumer_close(m_pkafka); // rd_kafka_destroy调Ì¡Â用®?会¨¢调Ì¡Â用®?rd_kafka_consumer_close
- rd_kafka_destroy(m_pkafka);
- m_pkafka = nullptr;
- }
- }
// Test.cpp
-
- #include "stdafx.h"
- #include "KafkaConsumer.h"
- #include <assert.h>
-
- int _tmain(int argc, _TCHAR* argv[])
- {
- string brokers = "192.168.254.129:9092";
- string strTopic = "test";
- string group = "test1";
- int nPartition = 0;
- CKafkaConsumer consumer;
- TKafkaCfgInfo t;
- t.m_strBrokers = brokers;
- t.m_strGroupID = group;
- t.m_strTopics = strTopic;
- t.m_nPartition = nPartition;
- int nRet = consumer.InitCfg(t);
- if (nRet != 0)
- {
- assert(0);
- return 0;
- }
-
- int nBatchSize = 10;
- rd_kafka_message_t** pRet = new rd_kafka_message_t * [nBatchSize];
-
- while (1)
- {
- int nTimeout = 1000;
- ssize_t nCnt = 0;
- consumer.pullMessage(pRet, nCnt);
-
- if (nCnt < 0)
- {
- fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
- continue;
- }
-
- if (nCnt == 0)
- {
- // donothing
- continue;
- }
-
- for (int nIndex = 0; nIndex < nCnt; nIndex++)
- {
- rd_kafka_message_t* pMsg = pRet[nIndex];
- printf("%s\n", pMsg->payload);
- consumer.SetConsumerOk(pMsg);
- rd_kafka_message_destroy(pMsg);
- }
- }
-
- delete[] pRet;
- return 0;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。