kafka(librdkafka) windows c++ 编程

libkafka windows编译








也就是说,在编程的时候先要通过cmd运行zkserver和.\bin\windows\kafka-server-start.bat config\server.properties,后面运行指令需要先进入到kafka的压缩目录,如上面红色框所示。




3.第三步当然是上实例啦,记得在编程的时候先要通过cmd运行zkserver和.\bin\windows\kafka-server-start.bat config\server.properties,


  1. #include <iostream>
  2. #include <string>
  3. #include <list>
  4. #include <stdint.h>
  5. #include "../src-cpp/rdkafkacpp.h"
  6. static bool run = true;
  7. static bool exit_eof = false;
  8. void dump_config(RdKafka::Conf* conf) {
  9. std::list<std::string> *dump = conf->dump();
  10. printf("config dump(%d):\n", (int32_t)dump->size());
  11. for (auto it = dump->begin(); it != dump->end();) {
  12. std::string name = *it++;
  13. std::string value = *it++;
  14. printf("%s = %s\n", name.c_str(), value.c_str());
  15. }
  16. printf("---------------------------------------------\n");
  17. }
  18. class my_event_cb : public RdKafka::EventCb {
  19. public:
  20. void event_cb(RdKafka::Event &event) override {
  21. switch (event.type())
  22. {
  23. case RdKafka::Event::EVENT_ERROR:
  24. std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
  25. event.str() << std::endl;
  26. if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
  27. run = false;
  28. break;
  29. case RdKafka::Event::EVENT_STATS:
  30. std::cerr << "\"STATS\": " << event.str() << std::endl;
  31. break;
  32. case RdKafka::Event::EVENT_LOG:
  33. fprintf(stderr, "LOG-%i-%s: %s\n",
  34. event.severity(), event.fac().c_str(), event.str().c_str());
  35. break;
  36. default:
  37. std::cerr << "EVENT " << event.type() <<
  38. " (" << RdKafka::err2str(event.err()) << "): " <<
  39. event.str() << std::endl;
  40. break;
  41. }
  42. }
  43. };
  44. class my_hash_partitioner_cb : public RdKafka::PartitionerCb {
  45. public:
  46. int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
  47. int32_t partition_cnt, void *msg_opaque) override {
  48. return djb_hash(key->c_str(), key->size()) % partition_cnt;
  49. }
  50. private:
  51. static inline unsigned int djb_hash(const char *str, size_t len) {
  52. unsigned int hash = 5381;
  53. for (size_t i = 0; i < len; i++)
  54. hash = ((hash << 5) + hash) + str[i];
  55. return hash;
  56. }
  57. };
  58. namespace producer_ts {
  59. class my_delivery_report_cb : public RdKafka::DeliveryReportCb {
  60. public:
  61. void dr_cb(RdKafka::Message& message) override {
  62. printf("message delivery %d bytes, error:%s, key: %s\n",
  63. (int32_t)message.len(), message.errstr().c_str(), message.key() ? message.key()->c_str() : "");
  64. }
  65. };
  66. void producer_test() {
  67. printf("producer test\n");
  68. int32_t partition = RdKafka::Topic::PARTITION_UA;
  69. printf("input brokers list(,,\n");
  70. std::string broker_list;
  71. //std::cin >> broker_list;
  72. broker_list = "";
  73. printf("input partition:");
  74. //std::cin >> partition;
  75. partition = 0;
  76. // config
  77. RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  78. RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  79. my_hash_partitioner_cb hash_partitioner;
  80. my_event_cb event_cb;
  81. my_delivery_report_cb delivery_cb;
  82. std::string err_string;
  83. if (topic_conf->set("partitioner_cb", &hash_partitioner, err_string) != RdKafka::Conf::CONF_OK) {
  84. printf("set partitioner_cb error: %s\n", err_string.c_str());
  85. return;
  86. }
  87. global_conf->set("metadata.broker.list", broker_list, err_string);
  88. global_conf->set("event_cb", &event_cb, err_string);
  89. global_conf->set("dr_cb", &delivery_cb, err_string);
  90. //global_conf->set("retry.backoff.ms", "10", err_string);
  91. //global_conf->set("debug", "all", err_string);
  92. //global_conf->set("debug", "topic,msg", err_string);
  93. //global_conf->set("debug", "msg,queue", err_string);
  94. dump_config(global_conf);
  95. dump_config(topic_conf);
  96. // create producer
  97. RdKafka::Producer* producer = RdKafka::Producer::create(global_conf, err_string);
  98. if (!producer) {
  99. printf("failed to create producer, %s\n", err_string.c_str());
  100. return;
  101. }
  102. printf("created producer %s\n", producer->name().c_str());
  103. std::string topic_name;
  104. while (true) {
  105. printf("input topic to create:\n");
  106. std::cin >> topic_name;
  107. // create topic
  108. RdKafka::Topic* topic =
  109. RdKafka::Topic::create(producer, topic_name, topic_conf, err_string);
  110. if (!topic) {
  111. printf("try create topic[%s] failed, %s\n",
  112. topic_name.c_str(), err_string.c_str());
  113. return;
  114. }
  115. printf(">");
  116. for (std::string line; run && std::getline(std::cin, line);) {
  117. if (line.empty()) {
  118. producer->poll(0);
  119. continue;
  120. }
  121. if (line == "quit") {
  122. run = false;
  123. break;
  124. }
  125. std::string key = "kafka_test";
  126. RdKafka::ErrorCode res = producer->produce(topic, partition,
  127. RdKafka::Producer::RK_MSG_COPY,
  128. (char*)line.c_str(), line.size(), key.c_str(), key.size(), NULL);
  129. if (res != RdKafka::ERR_NO_ERROR) {
  130. printf("produce failed, %s\n", RdKafka::err2str(res).c_str());
  131. }
  132. else {
  133. printf("produced msg, bytes %d\n", (int32_t)line.size());
  134. }
  135. // do socket io
  136. producer->poll(0);
  137. printf("outq_len: %d\n", producer->outq_len());
  138. //producer->flush(1000);
  139. //while (run && producer->outq_len()) {
  140. // printf("wait for write queue( size %d) write finish\n", producer->outq_len());
  141. // producer->poll(1000);
  142. //}
  143. printf(">");
  144. }
  145. delete topic;
  146. if (!run) {
  147. break;
  148. }
  149. }
  150. run = true;
  151. while (run && producer->outq_len()) {
  152. printf("wait for write queue( size %d) write finish\n", producer->outq_len());
  153. producer->poll(1000);
  154. }
  155. delete producer;
  156. }
  157. }

运行时在main函数的cpp加上该代码的头文件,运行函数为void producer_test(),这代码别人的,写的有点乱,将就看吧,能运行。


  1. #include <iostream>
  2. #include <string>
  3. #include <list>
  4. #include <stdint.h>
  5. #include "../src-cpp/rdkafkacpp.h"
  6. class kafka_consumer_client
  7. {
  8. public:
  9. kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset);
  10. //kafka_consumer_client();
  11. virtual ~kafka_consumer_client();
  12. bool initClient();
  13. bool consume(int timeout_ms);
  14. void finalize();
  15. private:
  16. void consumer(RdKafka::Message *msg, void *opt);
  17. std::string brokers_;
  18. std::string topics_;
  19. std::string groupid_;
  20. int64_t last_offset_ = 0;
  21. RdKafka::Consumer *kafka_consumer_ = nullptr;
  22. RdKafka::Topic *topic_ = nullptr;
  23. int64_t offset_ = RdKafka::Topic::OFFSET_BEGINNING;
  24. int32_t partition_ = 0;
  25. };
  1. #include "kafka_consumer_client.h"
  2. bool run_ = true;
  3. kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset)
  4. :brokers_(brokers),
  5. topics_(topics),
  6. groupid_(groupid),
  7. offset_(offset){
  8. }
  9. //kafka_consumer_client::kafka_consumer_client(){}
  10. kafka_consumer_client::~kafka_consumer_client(){}
  11. bool kafka_consumer_client::initClient(){
  12. RdKafka::Conf *conf = nullptr;
  13. conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  14. if (!conf){
  15. fprintf(stderr, "RdKafka create global conf failed\n");
  16. return false;
  17. }
  18. std::string errstr;
  19. /*设置broker list*/
  20. if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
  21. fprintf(stderr, "RdKafka conf set brokerlist failed : %s\n", errstr.c_str());
  22. }
  23. /*设置consumer group*/
  24. if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
  25. fprintf(stderr, "RdKafka conf set group.id failed : %s\n", errstr.c_str());
  26. }
  27. std::string strfetch_num = "10240000";
  28. /*每次从单个分区中拉取消息的最大尺寸*/
  29. if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
  30. fprintf(stderr, "RdKafka conf set max.partition failed : %s\n", errstr.c_str());
  31. }
  32. /*创建kafka consumer实例*/
  33. kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
  34. if (!kafka_consumer_){
  35. fprintf(stderr, "failed to ceate consumer\n");
  36. }
  37. delete conf;
  38. RdKafka::Conf *tconf = nullptr;
  39. /*创建kafka topic的配置*/
  40. tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  41. if (!tconf){
  42. fprintf(stderr, "RdKafka create topic conf failed\n");
  43. return false;
  44. }
  45. /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,
  46. 当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
  47. ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
  48. 2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,
  49. 在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始
  50. 消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
  51. 开始位置消费所有消息.*/
  52. if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK){
  53. fprintf(stderr, "RdKafka conf set auto.offset.reset failed : %s\n", errstr.c_str());
  54. }
  55. topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
  56. if (!topic_){
  57. fprintf(stderr, "RdKafka create topic failed : %s\n", errstr.c_str());
  58. }
  59. delete tconf;
  60. RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
  61. if (resp != RdKafka::ERR_NO_ERROR){
  62. fprintf(stderr, "failed to start consumer : %s\n", RdKafka::err2str(resp).c_str());
  63. }
  64. return true;
  65. }
  66. void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
  67. switch (message->err()){
  68. case RdKafka::ERR__TIMED_OUT:
  69. break;
  70. case RdKafka::ERR_NO_ERROR:
  71. printf("%.*s\n",
  72. static_cast<int>(message->len()),
  73. static_cast<const char*>(message->payload()));
  74. last_offset_ = message->offset();
  75. break;
  76. case RdKafka::ERR__PARTITION_EOF:
  77. std::cerr << "%% Reached the end of the queue, offset: " << last_offset_ << std::endl;
  78. break;
  79. case RdKafka::ERR__UNKNOWN_TOPIC:
  80. case RdKafka::ERR__UNKNOWN_PARTITION:
  81. std::cerr << "Consume failed: " << message->errstr() << std::endl;
  82. run_ = false;
  83. break;
  84. default:
  85. std::cerr << "Consume failed: " << message->errstr() << std::endl;
  86. run_ = false;
  87. break;
  88. }
  89. }
  90. bool kafka_consumer_client::consume(int timeout_ms){
  91. RdKafka::Message *msg = nullptr;
  92. while (run_){
  93. msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
  94. consumer(msg, nullptr);
  95. kafka_consumer_->poll(0);
  96. delete msg;
  97. }
  98. kafka_consumer_->stop(topic_, partition_);
  99. if (topic_){
  100. delete topic_;
  101. topic_ = nullptr;
  102. }
  103. if (kafka_consumer_){
  104. delete kafka_consumer_;
  105. kafka_consumer_ = nullptr;
  106. }
  107. /*销毁kafka实例*/
  108. RdKafka::wait_destroyed(5000);
  109. return true;
  110. }
  1. int main()
  2. {
  3. /*consumer_ts::consumer_test();*/
  4. std::string topics = "linlin";
  5. std::string brokers = "";
  6. std::string group = "1";
  7. std::shared_ptr<kafka_consumer_client> kafka_consumer_client_ = std::make_shared<kafka_consumer_client>(brokers, topics, group, 0);
  8. //std::shared_ptr<kafka_consumer_client> kafka_consumer_client_ = std::make_shared<kafka_consumer_client>();
  9. if (!kafka_consumer_client_->initClient()){
  10. fprintf(stderr, "kafka server initialize error\n");
  11. }
  12. else{
  13. printf("start kafka consumer\n");
  14. kafka_consumer_client_->consume(1000);
  15. }
  16. fprintf(stderr, "kafka consume exit! \n");
  17. return 0;
  18. }



