当前位置:   article > 正文

使用C语言操作kafka ---- librdkafka_c语言kafka客户端

c语言kafka客户端

1 安装librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
git checkout v1.7.0
./configure
make
sudo make install
sudo ldconfig
 

 在librdkafka的examples目录下会有示例程序。比如consumer的启动需要下列参数

./consumer <broker> <group.id> <topic1> <topic2>..

 指定broker、group id、topic(可以订阅多个)。示例:

指定broker、group id、topic(可以订阅多个)。示例:

缩略语介绍:

 

2 开启kafka相关服务

2.1 启动zookeeper

启动zookeeper可以通过下面的脚本来启动zookeeper服务,当然,也可以自己独立搭建zookeeper的集群来实现。这里我们直接使用kafka自带的zookeeper。

 

  1. cd bin/
  2. # 前台运行:
  3. sh zookeeper-server-start.sh ../config/zookeeper.properties
  4. # 后台运行:
  5. sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties

 可以通过命令lsof -i:2181 查看zookeeper是否启动成功。

  1. $ lsof -i:2181
  2. COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
  3. java 74930 fly 96u IPv6 734467 0t0 TCP *:2181 (LISTEN)

2.2 启动Kafka

启动kafka(kafka安装路径的bin目录下执行),默认启动端口9092。

sh kafka-server-start.sh -daemon ../config/server.properties

2.3 创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

–create 是创建主题的的动作指令。
–zookeeper 指定kafka所连接的zookeeper服务地址。
–replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
–partitions 指定分区个数;多通道,类似车道。
–topic 指定所要创建主题的名称,比如test。
 

3 c语言操作kafka的范例

3.1 消费者

在librdkafka\examples下有consumer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。

  1. /**
  2. * Simple high-level balanced Apache Kafka consumer
  3. * using the Kafka driver from librdkafka
  4. * (https://github.com/edenhill/librdkafka)
  5. */
  6. #include <stdio.h>
  7. #include <signal.h>
  8. #include <string.h>
  9. #include <ctype.h>
  10. /* Typical include path would be <librdkafka/rdkafka.h>, but this program
  11. * is builtin from within the librdkafka source tree and thus differs. */
  12. //#include <librdkafka/rdkafka.h>
  13. #include "rdkafka.h"
  14. static volatile sig_atomic_t run = 1;
  15. /**
  16. * @brief Signal termination of program
  17. */
  18. static void stop (int sig) {
  19. run = 0;
  20. }
  21. /**
  22. * @returns 1 if all bytes are printable, else 0.
  23. */
  24. static int is_printable (const char *buf, size_t size) {
  25. size_t i;
  26. for (i = 0 ; i < size ; i++)
  27. if (!isprint((int)buf[i]))
  28. return 0;
  29. return 1;
  30. }
  31. int main (int argc, char **argv) {
  32. rd_kafka_t *rk; /* Consumer instance handle */
  33. rd_kafka_conf_t *conf; /* Temporary configuration object */
  34. rd_kafka_resp_err_t err; /* librdkafka API error code */
  35. char errstr[512]; /* librdkafka API error reporting buffer */
  36. const char *brokers; /* Argument: broker list */
  37. const char *groupid; /* Argument: Consumer group id */
  38. char **topics; /* Argument: list of topics to subscribe to */
  39. int topic_cnt; /* Number of topics to subscribe to */
  40. rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
  41. int i;
  42. /*
  43. * Argument validation
  44. */
  45. if (argc < 4) {
  46. fprintf(stderr,
  47. "%% Usage: "
  48. "%s <broker> <group.id> <topic1> <topic2>..\n",
  49. argv[0]);
  50. return 1;
  51. }
  52. brokers = argv[1];
  53. groupid = argv[2];
  54. topics = &argv[3];
  55. topic_cnt = argc - 3;
  56. /*
  57. * Create Kafka client configuration place-holder
  58. */
  59. conf = rd_kafka_conf_new(); // 创建配置文件
  60. /* Set bootstrap broker(s) as a comma-separated list of
  61. * host or host:port (default port 9092).
  62. * librdkafka will use the bootstrap brokers to acquire the full
  63. * set of brokers from the cluster. */
  64. if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
  65. errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  66. fprintf(stderr, "%s\n", errstr);
  67. rd_kafka_conf_destroy(conf);
  68. return 1;
  69. }
  70. /* Set the consumer group id.
  71. * All consumers sharing the same group id will join the same
  72. * group, and the subscribed topic' partitions will be assigned
  73. * according to the partition.assignment.strategy
  74. * (consumer config property) to the consumers in the group. */
  75. if (rd_kafka_conf_set(conf, "group.id", groupid,
  76. errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  77. fprintf(stderr, "%s\n", errstr);
  78. rd_kafka_conf_destroy(conf);
  79. return 1;
  80. }
  81. /* If there is no previously committed offset for a partition
  82. * the auto.offset.reset strategy will be used to decide where
  83. * in the partition to start fetching messages.
  84. * By setting this to earliest the consumer will read all messages
  85. * in the partition if there was no previously committed offset. */
  86. if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
  87. errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  88. fprintf(stderr, "%s\n", errstr);
  89. rd_kafka_conf_destroy(conf);
  90. return 1;
  91. }
  92. /*
  93. * Create consumer instance.
  94. *
  95. * NOTE: rd_kafka_new() takes ownership of the conf object
  96. * and the application must not reference it again after
  97. * this call.
  98. */
  99. // 创建一个kafka消费者
  100. rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  101. if (!rk) {
  102. fprintf(stderr,
  103. "%% Failed to create new consumer: %s\n", errstr);
  104. return 1;
  105. }
  106. conf = NULL; /* Configuration object is now owned, and freed,
  107. * by the rd_kafka_t instance. */
  108. /* Redirect all messages from per-partition queues to
  109. * the main queue so that messages can be consumed with one
  110. * call from all assigned partitions.
  111. *
  112. * The alternative is to poll the main queue (for events)
  113. * and each partition queue separately, which requires setting
  114. * up a rebalance callback and keeping track of the assignment:
  115. * but that is more complex and typically not recommended. */
  116. rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中
  117. /* Convert the list of topics to a format suitable for librdkafka */
  118. // 创建主题分区列表
  119. subscription = rd_kafka_topic_partition_list_new(topic_cnt);
  120. for (i = 0 ; i < topic_cnt ; i++)
  121. rd_kafka_topic_partition_list_add(subscription,
  122. topics[i],
  123. /* the partition is ignored
  124. * by subscribe() */
  125. RD_KAFKA_PARTITION_UA);
  126. /* Subscribe to the list of topics */
  127. err = rd_kafka_subscribe(rk, subscription);
  128. if (err) {
  129. fprintf(stderr,
  130. "%% Failed to subscribe to %d topics: %s\n",
  131. subscription->cnt, rd_kafka_err2str(err));
  132. rd_kafka_topic_partition_list_destroy(subscription);
  133. rd_kafka_destroy(rk);
  134. return 1;
  135. }
  136. fprintf(stderr,
  137. "%% Subscribed to %d topic(s), "
  138. "waiting for rebalance and messages...\n",
  139. subscription->cnt);
  140. rd_kafka_topic_partition_list_destroy(subscription);
  141. /* Signal handler for clean shutdown */
  142. signal(SIGINT, stop);
  143. /* Subscribing to topics will trigger a group rebalance
  144. * which may take some time to finish, but there is no need
  145. * for the application to handle this idle period in a special way
  146. * since a rebalance may happen at any time.
  147. * Start polling for messages. */
  148. while (run) {
  149. rd_kafka_message_t *rkm;
  150. rkm = rd_kafka_consumer_poll(rk, 100);
  151. if (!rkm)
  152. continue; /* Timeout: no message within 100ms,
  153. * try again. This short timeout allows
  154. * checking for `run` at frequent intervals.
  155. */
  156. /* consumer_poll() will return either a proper message
  157. * or a consumer error (rkm->err is set). */
  158. if (rkm->err) {
  159. /* Consumer errors are generally to be considered
  160. * informational as the consumer will automatically
  161. * try to recover from all types of errors. */
  162. fprintf(stderr,
  163. "%% Consumer error: %s\n",
  164. rd_kafka_message_errstr(rkm));
  165. rd_kafka_message_destroy(rkm);
  166. continue;
  167. }
  168. /* Proper message. */
  169. printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",
  170. rd_kafka_topic_name(rkm->rkt), rkm->partition,
  171. rkm->offset);
  172. /* Print the message key. */
  173. if (rkm->key && is_printable(rkm->key, rkm->key_len))
  174. printf(" Key: %.*s\n",
  175. (int)rkm->key_len, (const char *)rkm->key);
  176. else if (rkm->key)
  177. printf(" Key: (%d bytes)\n", (int)rkm->key_len);
  178. /* Print the message value/payload. */
  179. if (rkm->payload && is_printable(rkm->payload, rkm->len))
  180. printf(" Value: %.*s\n",
  181. (int)rkm->len, (const char *)rkm->payload);
  182. else if (rkm->payload)
  183. printf(" Value: (%d bytes)\n", (int)rkm->len);
  184. rd_kafka_message_destroy(rkm);
  185. }
  186. /* Close the consumer: commit final offsets and leave the group. */
  187. fprintf(stderr, "%% Closing consumer\n");
  188. rd_kafka_consumer_close(rk);
  189. /* Destroy the consumer */
  190. rd_kafka_destroy(rk);
  191. return 0;
  192. }

3.2 生产者

在librdkafka\examples下有producer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。

  1. /**
  2. * Simple Apache Kafka producer
  3. * using the Kafka driver from librdkafka
  4. * (https://github.com/edenhill/librdkafka)
  5. */
  6. #include <stdio.h>
  7. #include <signal.h>
  8. #include <string.h>
  9. /* Typical include path would be <librdkafka/rdkafka.h>, but this program
  10. * is builtin from within the librdkafka source tree and thus differs. */
  11. #include "rdkafka.h"
  12. static volatile sig_atomic_t run = 1;
  13. /**
  14. * @brief Signal termination of program
  15. */
  16. static void stop (int sig) {
  17. run = 0;
  18. fclose(stdin); /* abort fgets() */
  19. }
  20. /**
  21. * @brief Message delivery report callback.
  22. *
  23. * This callback is called exactly once per message, indicating if
  24. * the message was succesfully delivered
  25. * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
  26. * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
  27. *
  28. * The callback is triggered from rd_kafka_poll() and executes on
  29. * the application's thread.
  30. */
  31. static void dr_msg_cb (rd_kafka_t *rk,
  32. const rd_kafka_message_t *rkmessage, void *opaque) {
  33. if (rkmessage->err)
  34. fprintf(stderr, "%% Message delivery failed: %s\n",
  35. rd_kafka_err2str(rkmessage->err));
  36. else
  37. fprintf(stderr,
  38. "%% Message delivered (%zd bytes, "
  39. "partition %"PRId32")\n",
  40. rkmessage->len, rkmessage->partition);
  41. /* The rkmessage is destroyed automatically by librdkafka */
  42. }
  43. int main (int argc, char **argv) {
  44. rd_kafka_t *rk; /* Producer instance handle */
  45. rd_kafka_conf_t *conf; /* Temporary configuration object */
  46. char errstr[512]; /* librdkafka API error reporting buffer */
  47. char buf[512]; /* Message value temporary buffer */
  48. const char *brokers; /* Argument: broker list */
  49. const char *topic; /* Argument: topic to produce to */
  50. /*
  51. * Argument validation
  52. */
  53. if (argc != 3) {
  54. fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
  55. return 1;
  56. }
  57. brokers = argv[1];
  58. topic = argv[2];
  59. /*
  60. * Create Kafka client configuration place-holder
  61. */
  62. conf = rd_kafka_conf_new();
  63. /* Set bootstrap broker(s) as a comma-separated list of
  64. * host or host:port (default port 9092).
  65. * librdkafka will use the bootstrap brokers to acquire the full
  66. * set of brokers from the cluster. */
  67. if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
  68. errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  69. fprintf(stderr, "%s\n", errstr);
  70. return 1;
  71. }
  72. /* Set the delivery report callback.
  73. * This callback will be called once per message to inform
  74. * the application if delivery succeeded or failed.
  75. * See dr_msg_cb() above.
  76. * The callback is only triggered from rd_kafka_poll() and
  77. * rd_kafka_flush(). */
  78. rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
  79. /*
  80. * Create producer instance.
  81. *
  82. * NOTE: rd_kafka_new() takes ownership of the conf object
  83. * and the application must not reference it again after
  84. * this call.
  85. */
  86. rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  87. if (!rk) {
  88. fprintf(stderr,
  89. "%% Failed to create new producer: %s\n", errstr);
  90. return 1;
  91. }
  92. /* Signal handler for clean shutdown */
  93. signal(SIGINT, stop);
  94. fprintf(stderr,
  95. "%% Type some text and hit enter to produce message\n"
  96. "%% Or just hit enter to only serve delivery reports\n"
  97. "%% Press Ctrl-C or Ctrl-D to exit\n");
  98. while (run && fgets(buf, sizeof(buf), stdin)) {
  99. size_t len = strlen(buf);
  100. rd_kafka_resp_err_t err;
  101. if (buf[len-1] == '\n') /* Remove newline */
  102. buf[--len] = '\0';
  103. if (len == 0) {
  104. /* Empty line: only serve delivery reports */
  105. rd_kafka_poll(rk, 0/*non-blocking */);
  106. continue;
  107. }
  108. /*
  109. * Send/Produce message.
  110. * This is an asynchronous call, on success it will only
  111. * enqueue the message on the internal producer queue.
  112. * The actual delivery attempts to the broker are handled
  113. * by background threads.
  114. * The previously registered delivery report callback
  115. * (dr_msg_cb) is used to signal back to the application
  116. * when the message has been delivered (or failed).
  117. */
  118. retry:
  119. err = rd_kafka_producev(
  120. /* Producer handle */
  121. rk,
  122. /* Topic name */
  123. RD_KAFKA_V_TOPIC(topic),
  124. /* Make a copy of the payload. */
  125. RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
  126. /* Message value and length */
  127. RD_KAFKA_V_VALUE(buf, len),
  128. /* Per-Message opaque, provided in
  129. * delivery report callback as
  130. * msg_opaque. */
  131. RD_KAFKA_V_OPAQUE(NULL),
  132. /* End sentinel */
  133. RD_KAFKA_V_END);
  134. if (err) {
  135. /*
  136. * Failed to *enqueue* message for producing.
  137. */
  138. fprintf(stderr,
  139. "%% Failed to produce to topic %s: %s\n",
  140. topic, rd_kafka_err2str(err));
  141. if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
  142. /* If the internal queue is full, wait for
  143. * messages to be delivered and then retry.
  144. * The internal queue represents both
  145. * messages to be sent and messages that have
  146. * been sent or failed, awaiting their
  147. * delivery report callback to be called.
  148. *
  149. * The internal queue is limited by the
  150. * configuration property
  151. * queue.buffering.max.messages */
  152. rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
  153. goto retry;
  154. }
  155. } else {
  156. fprintf(stderr, "%% Enqueued message (%zd bytes) "
  157. "for topic %s\n",
  158. len, topic);
  159. }
  160. /* A producer application should continually serve
  161. * the delivery report queue by calling rd_kafka_poll()
  162. * at frequent intervals.
  163. * Either put the poll call in your main loop, or in a
  164. * dedicated thread, or call it after every
  165. * rd_kafka_produce() call.
  166. * Just make sure that rd_kafka_poll() is still called
  167. * during periods where you are not producing any messages
  168. * to make sure previously produced messages have their
  169. * delivery report callback served (and any other callbacks
  170. * you register). */
  171. rd_kafka_poll(rk, 0/*non-blocking*/);
  172. }
  173. /* Wait for final messages to be delivered or fail.
  174. * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
  175. * waits for all messages to be delivered. */
  176. fprintf(stderr, "%% Flushing final messages..\n");
  177. rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
  178. /* If the output queue is still not empty there is an issue
  179. * with producing messages to the clusters. */
  180. if (rd_kafka_outq_len(rk) > 0)
  181. fprintf(stderr, "%% %d message(s) were not delivered\n",
  182. rd_kafka_outq_len(rk));
  183. /* Destroy the producer instance */
  184. rd_kafka_destroy(rk);
  185. return 0;
  186. }

 

3.3 生产者和消费者的交互


(1)启动消费者。

./consumer localhost:9092 0 test
1
显示:

% Subscribed to 1 topic(s), waiting for rebalance and messages...
1
(2)启动生产者。

./producer localhost:9092 test

总结

  1. 一个分区只能被一个消费者读取。如果一个topic只有一个分区,多个消费者读取时只有一个消费者能读到数据;单个分区开启多个消费者去读取数据是没有意义的。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/894068
推荐阅读
相关标签
  

闽ICP备14008679号