当前位置:   article > 正文

Kafka 如何实现更高效消费者-批量读取(C++客户端实现)_rdkafka

rdkafka

作为一个c++程序,我相信很多人都会使用c++的librdkafkacpp的接口,这些接口通过C++封装,使用起来方便,不需要去了解底层的c接口的封装及实现,是最理想的使用方式。

那为啥还要使用C接口呢?

这就涉及到C++的ABI二进制兼容问题。

当我们编译一个C++动态库,受限于不同的编译器版本,无论是gcc还是VC,都存在着兼容问题。对于一个生产版本,如果在统一了编译工具链的情景下,是可以使用C++版本,但如果考虑到更好的ABI兼容,所以就选择了C库。其无论在VC还是gcc,都表现得很好。

C库kafka的消费模式

 三种消费模式的异同

librdKafka 提供了三种不同的消费模式,下面将讲解这三种模式的差异及如何使用。

  1. // 一次读取一条数据
  2. rd_kafka_consumer_poll();
  3. rd_kafka_consume();
  4. // 批量读取,一次可以读取多条数据
  5. // 通过回调函数的批量读取,将废弃
  6. rd_kafka_consume_callback();
  7. rd_kafka_consume_batch();
  8. // 通过Queue路由道特定的处理函数
  9. rd_kafka_consume_callback_queue
  10. rd_kafka_consume_batch_queue

1、这种模式,直接使用rd_kafka_consumer_poll()接口,其可以实现1次读取一条,这种场景对于数据量不大,可以很好的使用,而且其客户端API也实现了相关的消息确认和提交,大多数采用这种方式,这种方式的例程,我在前面的博文中有讲到,有兴趣的可以去看看

2、这种模式是批量读取的模式,这种模式下有两种接口,一种基于api的回调

rd_kafka_consume_callback,这个接口根据官方文档是即将被废弃的接口,所以不建议使用该接口,建议使用rd_kafka_consume_batch(),后面会给出这种模式下的使用例子。

3、使用Queue的方式,该方式也提供了两种模式,一种回调,一种直接消费,回调的方式应该在性能上是最高的。这种方式与第二种方式的不同是,他能让不同topic,不同patition的数据路由到一个队列,也就是对于一个需要处理多种数据的消费者。

使用批量读取的注意:

1、消息最好程序中自己做确认提交,这样在kafka的服务,才不会看到滞后的消息消费

2、注意消息的销毁,kafka在销毁对象时会持有引用技术,消费完的消息,要destoy

3、对于topic+partition的消费模式,最好不要在运行热新增分区或主题,否则客户端处理起来比较麻烦。如果必要这么做,要做好测试。保证数据不重复不丢消息。

代码实现

KafkaConsumer.h

  1. #ifndef KAFKACONSUMER_H
  2. #define KAFKACONSUMER_H
  3. #include <string>
  4. #include <iostream>
  5. #include <vector>
  6. #include <stdio.h>
  7. #include "rdkafka.h"
  8. using namespace std;
  9. struct TKafkaCfgInfo
  10. {
  11. string m_strBrokers;
  12. string m_strGroupID;
  13. string m_strTopics;
  14. int m_nPartition;
  15. int m_nBatchReadSize;
  16. int m_nReadTimeout;
  17. bool m_bIsLogKafka;
  18. TKafkaCfgInfo()
  19. : m_nPartition(-1)
  20. , m_nBatchReadSize(-1)
  21. , m_nReadTimeout(-1)
  22. , m_bIsLogKafka(false)
  23. {}
  24. };
  25. class CKafkaConsumer
  26. {
  27. public:
  28. CKafkaConsumer();
  29. virtual ~CKafkaConsumer();
  30. // 初?始º?化¡¥Kafka配?置?
  31. int InitCfg(const TKafkaCfgInfo& tCfg);
  32. const TKafkaCfgInfo& GetCfg() { return m_tCfg; }
  33. // 拉¤-取¨?Kafka消?息¡é
  34. void pullMessage(rd_kafka_message_t** p, ssize_t& nCnt);
  35. void SetConsumerOk(rd_kafka_message_t* pMsg);
  36. protected:
  37. rd_kafka_t* m_pkafka;
  38. rd_kafka_topic_t* m_pKafkaTopic;
  39. TKafkaCfgInfo m_tCfg;
  40. };
  41. #endif // KAFKACONSUMER_H

// KafkaConsumer.cpp

  1. #include "KafkaConsumer.h"
  2. #include <iostream>
  3. using namespace std;
  4. static void RebalanceCb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque)
  5. {
  6. 测a试º?代䨲码?
  7. #if 0
  8. if (partitions)
  9. {
  10. for (int i = 0; i < partitions->cnt; i++)
  11. {
  12. partitions->elems[i].offset = 0;
  13. }
  14. }
  15. printf("RebalanceCb \r\n");
  16. #endif
  17. printf("RebalanceCb \n");
  18. rd_kafka_assign(rk, partitions);
  19. }
  20. static void EventErrorCb(rd_kafka_t* rk, int err,
  21. const char* reason,
  22. void* opaque)
  23. {
  24. printf("Kafka EventErrorCb(%d): %s\n", err, reason);
  25. }
  26. CKafkaConsumer::CKafkaConsumer()
  27. : m_pkafka(nullptr)
  28. , m_pKafkaTopic(nullptr)
  29. {
  30. }
  31. int CKafkaConsumer::InitCfg(const TKafkaCfgInfo& tCfg)
  32. {
  33. m_tCfg = tCfg;
  34. rd_kafka_conf_t* pConf = rd_kafka_conf_new();
  35. if (!pConf)
  36. {
  37. return -1;
  38. }
  39. char szErr[512] = { 0 };
  40. if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_tCfg.m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
  41. {
  42. rd_kafka_conf_destroy(pConf);
  43. return -1;
  44. }
  45. if (rd_kafka_conf_set(pConf, "group.id", m_tCfg.m_strGroupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
  46. {
  47. rd_kafka_conf_destroy(pConf);
  48. return -1;
  49. }
  50. // rd_kafka_conf_set_rebalance_cb(pConf, &RebalanceCb);
  51. rd_kafka_conf_set_error_cb(pConf, EventErrorCb);
  52. if (rd_kafka_conf_set(pConf, "enable.auto.commit", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
  53. {
  54. rd_kafka_conf_destroy(pConf);
  55. return -1;
  56. }
  57. if (rd_kafka_conf_set(pConf, "enable.auto.offset.store", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
  58. {
  59. rd_kafka_conf_destroy(pConf);
  60. return -1;
  61. }
  62. // topic配?置?
  63. /*
  64. rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
  65. /*if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "latest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
  66. {
  67. rd_kafka_topic_conf_destroy(pTopicConf);
  68. return -1;
  69. }*/
  70. // 创ä¡ä建¡§kafka实º¦Ì例¤y
  71. m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
  72. if (!m_pkafka)
  73. {
  74. return -1;
  75. }
  76. m_pKafkaTopic = rd_kafka_topic_new(m_pkafka, m_tCfg.m_strTopics.c_str(), nullptr);
  77. if (!m_pKafkaTopic)
  78. {
  79. return -1;
  80. }
  81. rd_kafka_consume_start(m_pKafkaTopic, m_tCfg.m_nPartition, RD_KAFKA_OFFSET_STORED);
  82. return 0;
  83. }
  84. void CKafkaConsumer::pullMessage(rd_kafka_message_t** pRet, ssize_t& nCnt)
  85. {
  86. nCnt = rd_kafka_consume_batch(m_pKafkaTopic, m_tCfg.m_nPartition, m_tCfg.m_nReadTimeout, pRet, m_tCfg.m_nBatchReadSize);
  87. // rd_kafka_poll(m_pkafka, 0);
  88. }
  89. void CKafkaConsumer::SetConsumerOk(rd_kafka_message_t* pMsg)
  90. {
  91. rd_kafka_resp_err_t errCode = rd_kafka_offset_store(m_pKafkaTopic, m_tCfg.m_nPartition, pMsg->offset);
  92. printf("rd_kafka_offset_store offset = %d\n", pMsg->offset);
  93. if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
  94. {
  95. return;
  96. }
  97. errCode = rd_kafka_commit_message(m_pkafka, pMsg, 0);
  98. if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
  99. {
  100. return;
  101. }
  102. }
  103. CKafkaConsumer::~CKafkaConsumer()
  104. {
  105. if (m_pKafkaTopic)
  106. {
  107. rd_kafka_consume_stop(m_pKafkaTopic, m_tCfg.m_nPartition);
  108. rd_kafka_topic_destroy(m_pKafkaTopic);
  109. m_pKafkaTopic = nullptr;
  110. }
  111. if (m_pkafka)
  112. {
  113. // rd_kafka_consumer_close(m_pkafka); // rd_kafka_destroy调Ì¡Â用®?会¨¢调Ì¡Â用®?rd_kafka_consumer_close
  114. rd_kafka_destroy(m_pkafka);
  115. m_pkafka = nullptr;
  116. }
  117. }

// Test.cpp

  1. #include "stdafx.h"
  2. #include "KafkaConsumer.h"
  3. #include <assert.h>
  4. int _tmain(int argc, _TCHAR* argv[])
  5. {
  6. string brokers = "192.168.254.129:9092";
  7. string strTopic = "test";
  8. string group = "test1";
  9. int nPartition = 0;
  10. CKafkaConsumer consumer;
  11. TKafkaCfgInfo t;
  12. t.m_strBrokers = brokers;
  13. t.m_strGroupID = group;
  14. t.m_strTopics = strTopic;
  15. t.m_nPartition = nPartition;
  16. int nRet = consumer.InitCfg(t);
  17. if (nRet != 0)
  18. {
  19. assert(0);
  20. return 0;
  21. }
  22. int nBatchSize = 10;
  23. rd_kafka_message_t** pRet = new rd_kafka_message_t * [nBatchSize];
  24. while (1)
  25. {
  26. int nTimeout = 1000;
  27. ssize_t nCnt = 0;
  28. consumer.pullMessage(pRet, nCnt);
  29. if (nCnt < 0)
  30. {
  31. fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
  32. continue;
  33. }
  34. if (nCnt == 0)
  35. {
  36. // donothing
  37. continue;
  38. }
  39. for (int nIndex = 0; nIndex < nCnt; nIndex++)
  40. {
  41. rd_kafka_message_t* pMsg = pRet[nIndex];
  42. printf("%s\n", pMsg->payload);
  43. consumer.SetConsumerOk(pMsg);
  44. rd_kafka_message_destroy(pMsg);
  45. }
  46. }
  47. delete[] pRet;
  48. return 0;
  49. }
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号