当前位置:   article > 正文

kafka基本原理及C/C++ API 实现_kafka c接口

kafka c接口

引入消息队列的作用:

  1. 解耦,不同服务组件通过消息队列实现接口
  2. 缓冲,利于消费者和生产者速度匹配,也利于流量削峰
  3. 异步,消息队列可以实现将

Partition:为了实现扩展性,提并发能 Topic 可以分布到多个 Broker(即服务器)上, Topic 可以分为多个 Partitiontopic在不同的分区的数据是不重复的,每个 Partition 个有序的队列,其表现形式就是个的件夹
 

Replication个分区都有多个副本,副本的作是做备胎。当主分区(Leader)故障的时候会选择个备胎(Follower)上位,成为Leader。在kafka中默认副本的最量是10个,且副本的数量不能Broker的数量,followerleader绝对是在不同的机器,同机器对同个分区也只可能存放个副本(包括⾃⼰

Leader:每个分区多个副本的副本,产者发送数据的对象,以及消费者消费数据的对象,都是 Leader


Follower:每个分区多个副本的副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 故障时,某个 Follower 还会成为新的 Leader

以下,展示kafka中,一个主题TopicA中,三个patition在不同的broker中的分布,以及生产者和消费者是如何分配的

C/C++接口实现

libkafka C接口实现:

kafkaConsumer.h

  1. #ifndef KAFKACONSUMER_H
  2. #define KAFKACONSUMER_H
  3. #include <string>
  4. #include <iostream>
  5. #include <vector>
  6. #include <stdio.h>
  7. #include "rdkafka.h"
  8. using namespace std;
  9. class KafkaConsumer
  10. {
  11. public:/**
  12. * @brief KafkaConsumer
  13. * @param brokers
  14. * @param groupID
  15. * @param topics
  16. * @param partition
  17. */
  18. KafkaConsumer();
  19. virtual ~KafkaConsumer();
  20. int InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics);
  21. void pullMessage();
  22. static void RebalanceCb(rd_kafka_t *rk,
  23. rd_kafka_resp_err_t err,
  24. rd_kafka_topic_partition_list_t *partitions,
  25. void *opaque);
  26. static void log_cb(const rd_kafka_t *rk, int level,
  27. const char *fac, const char *buf);
  28. static void EventErrorCb(rd_kafka_t *rk, int err,
  29. const char *reason,
  30. void *opaque);
  31. static int EventStatsCb(rd_kafka_t *rk,
  32. char *json,
  33. size_t json_len,
  34. void *opaque);
  35. protected:
  36. string m_strBrokers;
  37. string m_groupID;
  38. vector<string> m_topicVector;
  39. int m_partition;
  40. rd_kafka_t *m_pkafka;
  41. };
  42. #endif // KAFKACONSUMER_H

kafkaConsumer.c

  1. #include "KafkaConsumer.h"
  2. KafkaConsumer::KafkaConsumer()
  3. : m_pkafka(nullptr)
  4. {
  5. }
  6. int KafkaConsumer::InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics)
  7. {
  8. m_strBrokers = strBrokers;
  9. m_groupID = strGroupID;
  10. m_topicVector = vTopics;
  11. rd_kafka_conf_t *pConf = rd_kafka_conf_new();
  12. if(!pConf)
  13. {
  14. return -1;
  15. }
  16. char szErr[512] = { 0};
  17. if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
  18. fprintf(stderr, "%s\n", szErr);
  19. rd_kafka_conf_destroy(pConf);
  20. return -1;
  21. }
  22. if (rd_kafka_conf_set(pConf, "group.id", m_groupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
  23. fprintf(stderr, "%s\n", szErr);
  24. rd_kafka_conf_destroy(pConf);
  25. return -1;
  26. }
  27. if (rd_kafka_conf_set(pConf, "statistics.interval.ms", "10000", szErr, sizeof(szErr))!= RD_KAFKA_CONF_OK) {
  28. fprintf(stderr, "%s\n", szErr);
  29. rd_kafka_conf_destroy(pConf);
  30. return -1;
  31. }
  32. // rd_kafka_conf_set_stats_cb(pConf, &KafkaConsumer::EventStatsCb);
  33. //
  34. // rd_kafka_conf_set_error_cb(pConf, &KafkaConsumer::EventErrorCb);
  35. // rd_kafka_conf_set_log_cb(pConf, &KafkaConsumer::log_cb);
  36. // rd_kafka_conf_set_rebalance_cb(pConf, &KafkaConsumer::RebalanceCb);
  37. // topic配置
  38. rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
  39. if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "earliest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
  40. fprintf(stderr, "%s\n", szErr);
  41. rd_kafka_topic_conf_destroy(pTopicConf);
  42. return -1;
  43. }
  44. // rd_kafka_conf_set_default_topic_conf(pConf, pTopicConf);
  45. m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
  46. if (!m_pkafka) {
  47. fprintf(stderr, "%% Failed to create new consumer: %s\n", szErr);
  48. return -1;
  49. }
  50. rd_kafka_poll_set_consumer(m_pkafka);
  51. /* Convert the list of topics to a format suitable for librdkafka */
  52. rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(m_topicVector.size());
  53. for (int i = 0 ; i < m_topicVector.size() ; i++)
  54. {
  55. rd_kafka_topic_partition_list_add(subscription, m_topicVector[i].c_str(), RD_KAFKA_PARTITION_UA);
  56. }
  57. /* Subscribe to the list of topics */
  58. rd_kafka_resp_err_t err = rd_kafka_subscribe(m_pkafka, subscription);
  59. if (err) {
  60. fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err));
  61. rd_kafka_topic_partition_list_destroy(subscription);
  62. rd_kafka_destroy(m_pkafka);
  63. return -1;
  64. }
  65. fprintf(stderr, "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt);
  66. rd_kafka_topic_partition_list_destroy(subscription);
  67. return 0;
  68. }
  69. void KafkaConsumer::pullMessage()
  70. {
  71. rd_kafka_message_t *rkm;
  72. rkm = rd_kafka_consumer_poll(m_pkafka, 2000);
  73. if (!rkm)
  74. {
  75. return ; /* Timeout: no message within 100ms,
  76. * try again. This short timeout allows
  77. * checking for `run` at frequent intervals.
  78. */
  79. }
  80. /* consumer_poll() will return either a proper message
  81. * or a consumer error (rkm->err is set). */
  82. if (rkm->err) {
  83. /* Consumer errors are generally to be considered
  84. * informational as the consumer will automatically
  85. * try to recover from all types of errors. */
  86. fprintf(stderr,
  87. "%% Consumer error: %s\n",
  88. rd_kafka_message_errstr(rkm));
  89. rd_kafka_message_destroy(rkm);
  90. return;
  91. }
  92. /* Print the message value/payload. */
  93. if (rkm->payload)
  94. {
  95. printf(" Value: %.*s\n", (int)rkm->len, (const char *)rkm->payload);
  96. }
  97. rd_kafka_message_destroy(rkm);
  98. }
  99. void KafkaConsumer::RebalanceCb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
  100. {
  101. if(partitions)
  102. {
  103. for (int i=0; i<partitions->cnt; i++)
  104. {
  105. partitions->elems[i].offset = 0;
  106. }
  107. }
  108. rd_kafka_assign(rk, partitions);
  109. }
  110. #include <iostream>
  111. #include <stdio.h>
  112. void KafkaConsumer::log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
  113. {
  114. cout<< "< EventErrorCb >" << level << " " << fac << " " << buf <<endl;
  115. }
  116. void KafkaConsumer::EventErrorCb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
  117. {
  118. cout<< "< EventErrorCb >" << err << " " << reason <<endl;
  119. }
  120. int KafkaConsumer::EventStatsCb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
  121. {
  122. cout<< "< EventStatsCb >" << json <<endl;
  123. return 0;
  124. }
  125. KafkaConsumer::~KafkaConsumer()
  126. {
  127. rd_kafka_consumer_close(m_pkafka);
  128. /* Destroy the consumer */
  129. rd_kafka_destroy(m_pkafka);
  130. m_pkafka = nullptr;
  131. }

libkafkacpp C++消费者接口实现:

  1. #include "KafkaConsumer.h"
  2. KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
  3. const std::vector<std::string>& topics, int partition)
  4. {
  5. m_brokers = brokers;
  6. m_groupID = groupID;
  7. m_topicVector = topics;
  8. m_partition = partition;
  9. std::string errorStr;
  10. RdKafka::Conf::ConfResult errorCode;
  11. m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  12. m_event_cb = new ConsumerEventCb;
  13. errorCode = m_config->set("event_cb", m_event_cb, errorStr);
  14. if(errorCode != RdKafka::Conf::CONF_OK)
  15. {
  16. std::cout << "Conf set failed: " << errorStr << std::endl;
  17. }
  18. m_rebalance_cb = new ConsumerRebalanceCb;
  19. errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
  20. if(errorCode != RdKafka::Conf::CONF_OK)
  21. {
  22. std::cout << "Conf set failed: " << errorStr << std::endl;
  23. }
  24. errorCode = m_config->set("enable.partition.eof", "false", errorStr);
  25. if(errorCode != RdKafka::Conf::CONF_OK)
  26. {
  27. std::cout << "Conf set failed: " << errorStr << std::endl;
  28. }
  29. errorCode = m_config->set("group.id", m_groupID, errorStr);
  30. if(errorCode != RdKafka::Conf::CONF_OK)
  31. {
  32. std::cout << "Conf set failed: " << errorStr << std::endl;
  33. }
  34. errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
  35. if(errorCode != RdKafka::Conf::CONF_OK)
  36. {
  37. std::cout << "Conf set failed: " << errorStr << std::endl;
  38. }
  39. errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
  40. if(errorCode != RdKafka::Conf::CONF_OK)
  41. {
  42. std::cout << "Conf set failed: " << errorStr << std::endl;
  43. }
  44. // partition.assignment.strategy range,roundrobin
  45. m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  46. // 获取最新的消息数据
  47. errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
  48. if(errorCode != RdKafka::Conf::CONF_OK)
  49. {
  50. std::cout << "Topic Conf set failed: " << errorStr << std::endl;
  51. }
  52. errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
  53. if(errorCode != RdKafka::Conf::CONF_OK)
  54. {
  55. std::cout << "Conf set failed: " << errorStr << std::endl;
  56. }
  57. m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
  58. if(m_consumer == NULL)
  59. {
  60. std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
  61. }
  62. std::cout << "Created consumer " << m_consumer->name() << std::endl;
  63. }
  64. void msg_consume(RdKafka::Message* msg, void* opaque)
  65. {
  66. switch (msg->err())
  67. {
  68. case RdKafka::ERR__TIMED_OUT:
  69. std::cerr << "Consumer error: " << msg->errstr() << std::endl;
  70. break;
  71. case RdKafka::ERR_NO_ERROR: // 有消息进来
  72. std::cout << " Message in " << msg->topic_name() << " ["
  73. << msg->partition() << "] at offset " << msg->offset()
  74. << " key: " << msg->key() << " payload: "
  75. << (char*)msg->payload() << std::endl;
  76. break;
  77. default:
  78. std::cerr << "Consumer error: " << msg->errstr() << std::endl;
  79. break;
  80. }
  81. }
  82. void KafkaConsumer::pullMessage()
  83. {
  84. // 订阅Topic
  85. RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
  86. if (errorCode != RdKafka::ERR_NO_ERROR)
  87. {
  88. std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
  89. }
  90. // 消费消息
  91. while(true)
  92. {
  93. RdKafka::Message *msg = m_consumer->consume(1000);
  94. msg_consume(msg, NULL);
  95. delete msg;
  96. }
  97. }
  98. KafkaConsumer::~KafkaConsumer()
  99. {
  100. m_consumer->close();
  101. delete m_config;
  102. delete m_topicConfig;
  103. delete m_consumer;
  104. delete m_event_cb;
  105. delete m_rebalance_cb;
  106. }

libkafkacpp C++生产者接口实现:

  1. #include "KafkaProducer.h"
  2. KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
  3. {
  4. m_brokers = brokers;
  5. m_topicStr = topic;
  6. m_partition = partition;
  7. // 创建Kafka Conf对象
  8. m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  9. if(m_config == NULL)
  10. {
  11. std::cout << "Create RdKafka Conf failed." << std::endl;
  12. }
  13. // 创建Topic Conf对象
  14. m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  15. if(m_topicConfig == NULL)
  16. {
  17. std::cout << "Create RdKafka Topic Conf failed." << std::endl;
  18. }
  19. // 设置Broker属性
  20. RdKafka::Conf::ConfResult errCode;
  21. m_dr_cb = new ProducerDeliveryReportCb;
  22. std::string errorStr;
  23. errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
  24. if(errCode != RdKafka::Conf::CONF_OK)
  25. {
  26. std::cout << "Conf set failed:" << errorStr << std::endl;
  27. }
  28. m_event_cb = new ProducerEventCb;
  29. errCode = m_config->set("event_cb", m_event_cb, errorStr);
  30. if(errCode != RdKafka::Conf::CONF_OK)
  31. {
  32. std::cout << "Conf set failed:" << errorStr << std::endl;
  33. }
  34. m_partitioner_cb = new HashPartitionerCb;
  35. errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
  36. if(errCode != RdKafka::Conf::CONF_OK)
  37. {
  38. std::cout << "Conf set failed:" << errorStr << std::endl;
  39. }
  40. errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
  41. if(errCode != RdKafka::Conf::CONF_OK)
  42. {
  43. std::cout << "Conf set failed:" << errorStr << std::endl;
  44. }
  45. errCode = m_config->set("message.max.bytes", "10240000", errorStr);
  46. if(errCode != RdKafka::Conf::CONF_OK)
  47. {
  48. std::cout << "Conf set failed:" << errorStr << std::endl;
  49. }
  50. errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
  51. if(errCode != RdKafka::Conf::CONF_OK)
  52. {
  53. std::cout << "Conf set failed:" << errorStr << std::endl;
  54. }
  55. // request.required.acks 0=Broker does not send any response/ack to client,
  56. // -1 or all=Broker will block until message is committed by all in sync replicas (ISRs)
  57. // 创建Producer
  58. m_producer = RdKafka::Producer::create(m_config, errorStr);
  59. if(m_producer == NULL)
  60. {
  61. std::cout << "Create Producer failed:" << errorStr << std::endl;
  62. }
  63. // 创建Topic对象
  64. m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
  65. if(m_topic == NULL)
  66. {
  67. std::cout << "Create Topic failed:" << errorStr << std::endl;
  68. }
  69. }
  70. void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
  71. {
  72. int32_t len = str.length();
  73. void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
  74. RdKafka::ErrorCode errorCode = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA,
  75. RdKafka::Producer::RK_MSG_COPY,
  76. payload, len, &key, NULL);
  77. m_producer->poll(0);
  78. if (errorCode != RdKafka::ERR_NO_ERROR)
  79. {
  80. std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
  81. if(errorCode == RdKafka::ERR__QUEUE_FULL)
  82. {
  83. m_producer->poll(100);
  84. }
  85. }
  86. }
  87. KafkaProducer::~KafkaProducer()
  88. {
  89. while (m_producer->outq_len() > 0)
  90. {
  91. std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
  92. m_producer->flush(5000);
  93. }
  94. delete m_config;
  95. delete m_topicConfig;
  96. delete m_topic;
  97. delete m_producer;
  98. delete m_dr_cb;
  99. delete m_event_cb;
  100. delete m_partitioner_cb;
  101. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/894046
推荐阅读
相关标签
  

闽ICP备14008679号