赞
踩
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
git checkout v1.7.0
./configure
make
sudo make install
sudo ldconfig
在librdkafka的examples目录下会有示例程序。比如consumer的启动需要下列参数
./consumer <broker> <group.id> <topic1> <topic2>..
指定broker、group id、topic(可以订阅多个)。示例:
指定broker、group id、topic(可以订阅多个)。示例:
缩略语介绍:
启动zookeeper可以通过下面的脚本来启动zookeeper服务,当然,也可以自己独立搭建zookeeper的集群来实现。这里我们直接使用kafka自带的zookeeper。
- cd bin/
- # 前台运行:
- sh zookeeper-server-start.sh ../config/zookeeper.properties
-
- # 后台运行:
- sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
可以通过命令lsof -i:2181 查看zookeeper是否启动成功。
- $ lsof -i:2181
- COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
- java 74930 fly 96u IPv6 734467 0t0 TCP *:2181 (LISTEN)
启动kafka(kafka安装路径的bin目录下执行),默认启动端口9092。
sh kafka-server-start.sh -daemon ../config/server.properties
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
参数说明:
–create 是创建主题的的动作指令。
–zookeeper 指定kafka所连接的zookeeper服务地址。
–replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。
–partitions 指定分区个数;多通道,类似车道。
–topic 指定所要创建主题的名称,比如test。
在librdkafka\examples下有consumer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。
- /**
- * Simple high-level balanced Apache Kafka consumer
- * using the Kafka driver from librdkafka
- * (https://github.com/edenhill/librdkafka)
- */
-
- #include <stdio.h>
- #include <signal.h>
- #include <string.h>
- #include <ctype.h>
-
-
- /* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is builtin from within the librdkafka source tree and thus differs. */
- //#include <librdkafka/rdkafka.h>
- #include "rdkafka.h"
-
-
- static volatile sig_atomic_t run = 1;
-
- /**
- * @brief Signal termination of program
- */
- static void stop (int sig) {
- run = 0;
- }
-
-
-
- /**
- * @returns 1 if all bytes are printable, else 0.
- */
- static int is_printable (const char *buf, size_t size) {
- size_t i;
-
- for (i = 0 ; i < size ; i++)
- if (!isprint((int)buf[i]))
- return 0;
-
- return 1;
- }
-
-
- int main (int argc, char **argv) {
- rd_kafka_t *rk; /* Consumer instance handle */
- rd_kafka_conf_t *conf; /* Temporary configuration object */
- rd_kafka_resp_err_t err; /* librdkafka API error code */
- char errstr[512]; /* librdkafka API error reporting buffer */
- const char *brokers; /* Argument: broker list */
- const char *groupid; /* Argument: Consumer group id */
- char **topics; /* Argument: list of topics to subscribe to */
- int topic_cnt; /* Number of topics to subscribe to */
- rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
- int i;
-
- /*
- * Argument validation
- */
- if (argc < 4) {
- fprintf(stderr,
- "%% Usage: "
- "%s <broker> <group.id> <topic1> <topic2>..\n",
- argv[0]);
- return 1;
- }
-
- brokers = argv[1];
- groupid = argv[2];
- topics = &argv[3];
- topic_cnt = argc - 3;
-
-
- /*
- * Create Kafka client configuration place-holder
- */
- conf = rd_kafka_conf_new(); // 创建配置文件
-
- /* Set bootstrap broker(s) as a comma-separated list of
- * host or host:port (default port 9092).
- * librdkafka will use the bootstrap brokers to acquire the full
- * set of brokers from the cluster. */
- if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", errstr);
- rd_kafka_conf_destroy(conf);
- return 1;
- }
-
- /* Set the consumer group id.
- * All consumers sharing the same group id will join the same
- * group, and the subscribed topic' partitions will be assigned
- * according to the partition.assignment.strategy
- * (consumer config property) to the consumers in the group. */
- if (rd_kafka_conf_set(conf, "group.id", groupid,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", errstr);
- rd_kafka_conf_destroy(conf);
- return 1;
- }
-
- /* If there is no previously committed offset for a partition
- * the auto.offset.reset strategy will be used to decide where
- * in the partition to start fetching messages.
- * By setting this to earliest the consumer will read all messages
- * in the partition if there was no previously committed offset. */
- if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", errstr);
- rd_kafka_conf_destroy(conf);
- return 1;
- }
-
- /*
- * Create consumer instance.
- *
- * NOTE: rd_kafka_new() takes ownership of the conf object
- * and the application must not reference it again after
- * this call.
- */
- // 创建一个kafka消费者
- rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
- if (!rk) {
- fprintf(stderr,
- "%% Failed to create new consumer: %s\n", errstr);
- return 1;
- }
-
- conf = NULL; /* Configuration object is now owned, and freed,
- * by the rd_kafka_t instance. */
-
-
- /* Redirect all messages from per-partition queues to
- * the main queue so that messages can be consumed with one
- * call from all assigned partitions.
- *
- * The alternative is to poll the main queue (for events)
- * and each partition queue separately, which requires setting
- * up a rebalance callback and keeping track of the assignment:
- * but that is more complex and typically not recommended. */
- rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中
-
-
- /* Convert the list of topics to a format suitable for librdkafka */
- // 创建主题分区列表
- subscription = rd_kafka_topic_partition_list_new(topic_cnt);
- for (i = 0 ; i < topic_cnt ; i++)
- rd_kafka_topic_partition_list_add(subscription,
- topics[i],
- /* the partition is ignored
- * by subscribe() */
- RD_KAFKA_PARTITION_UA);
-
- /* Subscribe to the list of topics */
- err = rd_kafka_subscribe(rk, subscription);
- if (err) {
- fprintf(stderr,
- "%% Failed to subscribe to %d topics: %s\n",
- subscription->cnt, rd_kafka_err2str(err));
- rd_kafka_topic_partition_list_destroy(subscription);
- rd_kafka_destroy(rk);
- return 1;
- }
-
- fprintf(stderr,
- "%% Subscribed to %d topic(s), "
- "waiting for rebalance and messages...\n",
- subscription->cnt);
-
- rd_kafka_topic_partition_list_destroy(subscription);
-
-
- /* Signal handler for clean shutdown */
- signal(SIGINT, stop);
-
- /* Subscribing to topics will trigger a group rebalance
- * which may take some time to finish, but there is no need
- * for the application to handle this idle period in a special way
- * since a rebalance may happen at any time.
- * Start polling for messages. */
-
- while (run) {
- rd_kafka_message_t *rkm;
-
- rkm = rd_kafka_consumer_poll(rk, 100);
- if (!rkm)
- continue; /* Timeout: no message within 100ms,
- * try again. This short timeout allows
- * checking for `run` at frequent intervals.
- */
-
- /* consumer_poll() will return either a proper message
- * or a consumer error (rkm->err is set). */
- if (rkm->err) {
- /* Consumer errors are generally to be considered
- * informational as the consumer will automatically
- * try to recover from all types of errors. */
- fprintf(stderr,
- "%% Consumer error: %s\n",
- rd_kafka_message_errstr(rkm));
- rd_kafka_message_destroy(rkm);
- continue;
- }
-
- /* Proper message. */
- printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",
- rd_kafka_topic_name(rkm->rkt), rkm->partition,
- rkm->offset);
-
- /* Print the message key. */
- if (rkm->key && is_printable(rkm->key, rkm->key_len))
- printf(" Key: %.*s\n",
- (int)rkm->key_len, (const char *)rkm->key);
- else if (rkm->key)
- printf(" Key: (%d bytes)\n", (int)rkm->key_len);
-
- /* Print the message value/payload. */
- if (rkm->payload && is_printable(rkm->payload, rkm->len))
- printf(" Value: %.*s\n",
- (int)rkm->len, (const char *)rkm->payload);
- else if (rkm->payload)
- printf(" Value: (%d bytes)\n", (int)rkm->len);
-
- rd_kafka_message_destroy(rkm);
- }
-
-
- /* Close the consumer: commit final offsets and leave the group. */
- fprintf(stderr, "%% Closing consumer\n");
- rd_kafka_consumer_close(rk);
-
-
- /* Destroy the consumer */
- rd_kafka_destroy(rk);
-
- return 0;
- }
在librdkafka\examples下有producer.c文件,该文件是一个c语言操作kafka的代码范例,内容如下。
- /**
- * Simple Apache Kafka producer
- * using the Kafka driver from librdkafka
- * (https://github.com/edenhill/librdkafka)
- */
-
- #include <stdio.h>
- #include <signal.h>
- #include <string.h>
-
-
- /* Typical include path would be <librdkafka/rdkafka.h>, but this program
- * is builtin from within the librdkafka source tree and thus differs. */
- #include "rdkafka.h"
-
-
- static volatile sig_atomic_t run = 1;
-
- /**
- * @brief Signal termination of program
- */
- static void stop (int sig) {
- run = 0;
- fclose(stdin); /* abort fgets() */
- }
-
-
- /**
- * @brief Message delivery report callback.
- *
- * This callback is called exactly once per message, indicating if
- * the message was succesfully delivered
- * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
- * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
- *
- * The callback is triggered from rd_kafka_poll() and executes on
- * the application's thread.
- */
- static void dr_msg_cb (rd_kafka_t *rk,
- const rd_kafka_message_t *rkmessage, void *opaque) {
- if (rkmessage->err)
- fprintf(stderr, "%% Message delivery failed: %s\n",
- rd_kafka_err2str(rkmessage->err));
- else
- fprintf(stderr,
- "%% Message delivered (%zd bytes, "
- "partition %"PRId32")\n",
- rkmessage->len, rkmessage->partition);
-
- /* The rkmessage is destroyed automatically by librdkafka */
- }
-
-
-
- int main (int argc, char **argv) {
- rd_kafka_t *rk; /* Producer instance handle */
- rd_kafka_conf_t *conf; /* Temporary configuration object */
- char errstr[512]; /* librdkafka API error reporting buffer */
- char buf[512]; /* Message value temporary buffer */
- const char *brokers; /* Argument: broker list */
- const char *topic; /* Argument: topic to produce to */
-
- /*
- * Argument validation
- */
- if (argc != 3) {
- fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
- return 1;
- }
-
- brokers = argv[1];
- topic = argv[2];
-
-
- /*
- * Create Kafka client configuration place-holder
- */
- conf = rd_kafka_conf_new();
-
- /* Set bootstrap broker(s) as a comma-separated list of
- * host or host:port (default port 9092).
- * librdkafka will use the bootstrap brokers to acquire the full
- * set of brokers from the cluster. */
- if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
- errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", errstr);
- return 1;
- }
-
- /* Set the delivery report callback.
- * This callback will be called once per message to inform
- * the application if delivery succeeded or failed.
- * See dr_msg_cb() above.
- * The callback is only triggered from rd_kafka_poll() and
- * rd_kafka_flush(). */
- rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
-
- /*
- * Create producer instance.
- *
- * NOTE: rd_kafka_new() takes ownership of the conf object
- * and the application must not reference it again after
- * this call.
- */
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
- if (!rk) {
- fprintf(stderr,
- "%% Failed to create new producer: %s\n", errstr);
- return 1;
- }
-
- /* Signal handler for clean shutdown */
- signal(SIGINT, stop);
-
- fprintf(stderr,
- "%% Type some text and hit enter to produce message\n"
- "%% Or just hit enter to only serve delivery reports\n"
- "%% Press Ctrl-C or Ctrl-D to exit\n");
-
- while (run && fgets(buf, sizeof(buf), stdin)) {
- size_t len = strlen(buf);
- rd_kafka_resp_err_t err;
-
- if (buf[len-1] == '\n') /* Remove newline */
- buf[--len] = '\0';
-
- if (len == 0) {
- /* Empty line: only serve delivery reports */
- rd_kafka_poll(rk, 0/*non-blocking */);
- continue;
- }
-
- /*
- * Send/Produce message.
- * This is an asynchronous call, on success it will only
- * enqueue the message on the internal producer queue.
- * The actual delivery attempts to the broker are handled
- * by background threads.
- * The previously registered delivery report callback
- * (dr_msg_cb) is used to signal back to the application
- * when the message has been delivered (or failed).
- */
- retry:
- err = rd_kafka_producev(
- /* Producer handle */
- rk,
- /* Topic name */
- RD_KAFKA_V_TOPIC(topic),
- /* Make a copy of the payload. */
- RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
- /* Message value and length */
- RD_KAFKA_V_VALUE(buf, len),
- /* Per-Message opaque, provided in
- * delivery report callback as
- * msg_opaque. */
- RD_KAFKA_V_OPAQUE(NULL),
- /* End sentinel */
- RD_KAFKA_V_END);
-
- if (err) {
- /*
- * Failed to *enqueue* message for producing.
- */
- fprintf(stderr,
- "%% Failed to produce to topic %s: %s\n",
- topic, rd_kafka_err2str(err));
-
- if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
- /* If the internal queue is full, wait for
- * messages to be delivered and then retry.
- * The internal queue represents both
- * messages to be sent and messages that have
- * been sent or failed, awaiting their
- * delivery report callback to be called.
- *
- * The internal queue is limited by the
- * configuration property
- * queue.buffering.max.messages */
- rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
- goto retry;
- }
- } else {
- fprintf(stderr, "%% Enqueued message (%zd bytes) "
- "for topic %s\n",
- len, topic);
- }
-
-
- /* A producer application should continually serve
- * the delivery report queue by calling rd_kafka_poll()
- * at frequent intervals.
- * Either put the poll call in your main loop, or in a
- * dedicated thread, or call it after every
- * rd_kafka_produce() call.
- * Just make sure that rd_kafka_poll() is still called
- * during periods where you are not producing any messages
- * to make sure previously produced messages have their
- * delivery report callback served (and any other callbacks
- * you register). */
- rd_kafka_poll(rk, 0/*non-blocking*/);
- }
-
-
- /* Wait for final messages to be delivered or fail.
- * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
- * waits for all messages to be delivered. */
- fprintf(stderr, "%% Flushing final messages..\n");
- rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);
-
- /* If the output queue is still not empty there is an issue
- * with producing messages to the clusters. */
- if (rd_kafka_outq_len(rk) > 0)
- fprintf(stderr, "%% %d message(s) were not delivered\n",
- rd_kafka_outq_len(rk));
-
- /* Destroy the producer instance */
- rd_kafka_destroy(rk);
-
- return 0;
- }
(1)启动消费者。
./consumer localhost:9092 0 test
1
显示:
% Subscribed to 1 topic(s), waiting for rebalance and messages...
1
(2)启动生产者。
./producer localhost:9092 test
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。