git clone https://github.com/edenhill/librdkafka
# 进入目录
cd librdkafka/
# 配置
# 编译
# 安装
make install
# 头文件
# 库
visual studio 2019
参数 | 描述 |
bootstrap.servers | 生产者连接集群所需的broker地址清单。 |
key.serializer和value.serializer | 指定发送消息的key和value的序列化类型。 |
buffer.memory | RecordAccumulator缓冲区总大小,默认32m。 |
batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。 默认值是-1,-1和all是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是100ms。 |
enable.idempotence | 是否开启幂等性,默认true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。 |
#ifndef _KAFKA_PRODUCER_H_ #define _KAFKA_PRODUCER_H_ #include "rdkafkacpp.h" #include <memory> // 生产者投递报告回调 class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message& message) { if (message.err()) // err { printf("Message delivery failed:%s\n",message.errstr().c_str()); } else { printf("Message delivered to topic,topicName:%s,partition:%d\n", message.topic_name().c_str(), message.partition()); } } }; // 生产者事件回调函数 class ProducerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: printf("RdKafka::Event::EVENT_ERROR: %s\n", RdKafka::err2str(event.err()).c_str()); break; case RdKafka::Event::EVENT_STATS: printf("RdKafka::Event::EVENT_STATS, event:%s\n", event.str().c_str()); break; case RdKafka::Event::EVENT_LOG: printf("RdKafka::Event::EVENT_LOG, fac:%s\n", event.fac().c_str()); break; case RdKafka::Event::EVENT_THROTTLE: printf("RdKafka::Event::EVENT_THROTTLE, broker_name:%s\n", event.broker_name().c_str()); break; } } }; // 生产者自定义分区策略回调:partitioner_cb class HashPartitionerCb : public RdKafka::PartitionerCb { public: // @brief 返回 topic 中使用 key 的分区,msg_opaque 置 NULL // @return 返回分区,(0, partition_cnt) int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque) { char msg[128] = {0}; // 用于自定义分区策略:这里用 hash。例:轮询方式:p_id++ % partition_cnt int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt; // 输出:[topic][key][partition_cnt][partition_id],例 [test][6419][2][1] sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s], partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(), key->c_str(), partition_cnt, partition_id); printf("msg: %s\n", msg); return partition_id; } private: // 自定义哈希函数 static inline unsigned int generate_hash(const char *str, size_t len) { unsigned int hash = 5381; for (size_t i = 0; i < len; i++) hash = ((hash << 5) + hash) + str[i]; return hash; } }; class CKafkaProducer { public: /** * @brief CKafkaProducer * @param brokers * @param topic * @param partition:默认分区数 */ explicit CKafkaProducer(const std::string &brokers, const std::string &topic, int partition); ~CKafkaProducer(); int Create(); void Destroy(); /** * @brief push Message to Kafka * @param str, message data */ void PushMessage(const std::string &str, const std::string &key); private: std::string m_brokers; // Broker 列表,多个使用逗号分隔 std::string m_topicStr; // Topic 名称 int m_partition; // 分区 RdKafka::Conf* m_config; // Kafka Conf对象 RdKafka::Conf* m_topicConfig; // Topic Conf对象 RdKafka::Topic* m_topic; // Topic对象 RdKafka::Producer* m_producer; // Producer对象 RdKafka::DeliveryReportCb* m_dr_cb; // 设置传递回调 RdKafka::EventCb* m_event_cb; // 设置事件回调 RdKafka::PartitionerCb* m_partitioner_cb; // 设置自定义分区回调 }; #endif // _KAFKA_PRODUCER_H_
#include "KafkaProducer.h" CKafkaProducer::CKafkaProducer(const std::string &brokers, const std::string &topic, int partition) : m_brokers(brokers) , m_topicStr(topic) , m_partition(partition) , m_config(nullptr) , m_topicConfig(nullptr) , m_topic(nullptr) , m_producer(nullptr) , m_dr_cb(nullptr) , m_event_cb(nullptr) , m_partitioner_cb(nullptr) { } MyKafkaProducer::~MyKafkaProducer() { Destroy(); } int MyKafkaProducer::Create() { RdKafka::Conf::ConfResult errCode; // 创建错误码 std::string errorStr = ""; // 返回错误信息 do { // 创建配置对象 // 1.1、创建 Kafka Conf 对象 m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if (NULL == m_config) { printf("Create RdKafka Conf failed.\n"); break; } // 设置 Broker 属性 // (必要参数)指定 broker 地址列表。格式:host1:port1,host2:port2,... errCode = m_config->set("bootstrap.servers", m_brokers, errorStr); if (RdKafka::Conf::CONF_OK != errCode) { printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置生产者投递报告回调 m_dr_cb = new ProducerDeliveryReportCb; // 创建投递报告回调 errCode = m_config->set("dr_cb", m_dr_cb, errorStr); // 异步方式发送数据 if (RdKafka::Conf::CONF_OK != errCode) { printf("Conf set(dr_cb) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置生产者事件回调 m_event_cb = new ProducerEventCb; // 创建生产者事件回调 errCode = m_config->set("event_cb", m_event_cb, errorStr); if (RdKafka::Conf::CONF_OK != errCode) { printf("Conf set(event_cb) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置数据统计间隔 errCode = m_config->set("statistics.interval.ms", "10000", errorStr); if (RdKafka::Conf::CONF_OK != errCode) { printf("Conf set(statistics.interval.ms) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置最大发送消息大小 errCode = m_config->set("message.max.bytes", "10240000", errorStr); if (RdKafka::Conf::CONF_OK != errCode) { printf("Conf set(message.max.bytes) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 2、创建 Topic Conf 对象 m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if (NULL == m_topicConfig) { printf("Create RdKafka Topic Conf failed.\n"); break; } // 设置生产者自定义分区策略回调 m_partitioner_cb = new HashPartitionerCb; // 创建自定义分区投递回调 errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr); if (RdKafka::Conf::CONF_OK != errCode) { printf("Conf set(partitioner_cb) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 2、创建对象 // 2.1、创建 Producer 对象,可以发布不同的主题 m_producer = RdKafka::Producer::create(m_config, errorStr); if (NULL == m_producer) { printf("Create Producer failed, errorStr:%s.\n", errorStr.c_str()); break; } // 2.2、创建 Topic 对象,可以创建多个不同的 topic 对象 m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr); if (NULL == m_topic) { printf("Create Topic failed, errorStr:%s.\n", errorStr.c_str()); break; } printf("Created producer success.\n"); return 0; }while(0); Destroy(); return -1; } void MyKafkaProducer::Destroy() { while (nullptr !=m_producer && m_producer->outq_len() > 0) { m_producer->flush(5000); } if(nullptr != m_config) { delete m_config; m_config = nullptr; } if(nullptr != m_topicConfig) { delete m_topicConfig; m_topicConfig = nullptr; } if(nullptr != m_topic) { delete m_topic; m_topic = nullptr; } if(nullptr != m_producer) { delete m_producer; m_producer = nullptr; } if(nullptr != m_dr_cb) { delete m_dr_cb; m_dr_cb = nullptr; } if(nullptr != m_event_cb) { delete m_event_cb; m_event_cb = nullptr; } if(nullptr != m_partitioner_cb) { delete m_partitioner_cb; m_partitioner_cb = nullptr; } } void MyKafkaProducer::PushMessage(const std::string &str, const std::string &key) { int32_t len = (int32_t)str.length(); void *payload = const_cast<void *>(static_cast<const void *>(str.data())); // produce 方法,生产和发送单条消息到 Broker // 如果不加时间戳,内部会自动加上当前的时间戳 RdKafka::ErrorCode errorCode = m_producer->produce( m_topic, // 指定发送到的主题 RdKafka::Topic::PARTITION_UA, // 指定分区,如果为PARTITION_UA则通过 // partitioner_cb的回调选择合适的分区 RdKafka::Producer::RK_MSG_COPY, // 消息拷贝 payload, // 消息本身 len, // 消息长度 &key, // 消息key NULL ); // 轮询处理 m_producer->poll(0); if (RdKafka::ERR_NO_ERROR != errorCode) { printf("Produce failed,errorCode:%s\n",RdKafka::err2str(errorCode).c_str()); // kafka 队列满,等待 100 ms if (RdKafka::ERR__QUEUE_FULL == errorCode) { m_producer->poll(100); } } }
#include "KafkaProducer.h" #include <memory> int main() { std::string brokers = ""; std::string topic = "first-topic-test"; auto producer = std::make_shared<CKafkaProducer>(brokers, topic, 1000); if(!producer.get()) return -1; if(0 != producer->Create()) { return -1; } std::string msg = "test kafka"; std::string key = "xxx"; // 可选,涉及kafka保序策略 producer->PushMessage(msg, key); producer->Destroy(); delete producer; system("pause"); return 0; }
参数 | 描述 |
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
key.deserializer和value.deserializer | 指定接收消息的key和value的反序列化类型。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认1个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条。 |
#ifndef _KAFKA_CONSUMER_H_ #define _KAFKA_CONSUMER_H_ #include "rdkafkacpp.h" #include <thread> #include <mutex> // 设置事件回调 class ConsumerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: break; case RdKafka::Event::EVENT_STATS: break; case RdKafka::Event::EVENT_LOG: break; case RdKafka::Event::EVENT_THROTTLE: break; default: break; } } }; // 设置消费者组再平衡回调 // 注册该函数会关闭 rdkafka 的自动分区赋值和再分配 class ConsumerRebalanceCb : public RdKafka::RebalanceCb { private: // 打印当前获取的分区 static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>& partitions) { for (unsigned int i = 0; i < partitions.size(); i++) { printf("count:%d, topic:%s,partition:%d\n", i, partitions[i]->topic().c_str(), partitions[i]->partition()); } } public: // 消费者组再平衡回调 void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition *> &partitions) { printf("RebalanceCb: %s\n",RdKafka::err2str(err).c_str()); printTopicPartition(partitions); // 分区分配成功 if (RdKafka::ERR__ASSIGN_PARTITIONS == err) { // 消费者订阅这些分区 consumer->assign(partitions); // 获取消费者组本次订阅的分区数量,可以属于不同的topic m_partitionCount = (int)partitions.size(); } else // 分区分配失败 { // 消费者取消订阅所有的分区 consumer->unassign(); // 消费者订阅分区的数量为0 m_partitionCount = 0; } } private: int m_partitionCount; // 消费者组本次订阅的分区数量 }; class CKafkaConsumer { public: /** * @brief CKafkaConsumer * @param brokers * @param groupID:消费者组名称 * @param topics * @param partition:默认分区数 */ explicit CKafkaConsumer(const std::string &brokers, const std::string &groupID, const std::vector<std::string> &topics, int partition); ~CKafkaConsumer(); int Create(); void Destroy(); void PullMessage(); public: void OnRecv(); private: void ConsumeMsg_(RdKafka::Message *msg, void *opaque); private: std::string m_brokers; std::string m_groupID; std::vector<std::string> m_topicVector; int m_partition; RdKafka::Conf* m_config; RdKafka::Conf* m_topicConfig; RdKafka::KafkaConsumer* m_consumer; RdKafka::EventCb* m_event_cb; RdKafka::RebalanceCb* m_rebalance_cb; std::thread m_thread; bool m_running; typedef std::lock_guard<std::recursive_mutex> RecursiveGuard; std::recursive_mutex mutex_; }; #endif // _KAFKA_CONSUMER_H_
#include "MyKafkaConsumer.h" static int ConsumerWorker(void* param) { CKafkaConsumer* consumer = (CKafkaConsumer*)param; if (consumer) { consumer->OnRecv(); return 0; } return -1; } CKafkaConsumer::CKafkaConsumer(const std::string &brokers, const std::string &groupID, const std::vector<std::string> &topics, int partition) : m_brokers(brokers) , m_groupID(groupID) , m_topicVector(topics) , m_partition(partition) , m_running(true) { } CKafkaConsumer::~CKafkaConsumer() { Destroy(); } int CKafkaConsumer::Create() { std::string errorStr; RdKafka::Conf::ConfResult errorCode; do { // 1、创建配置对象 // 1.1、构造 consumer conf 对象 m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if(nullptr == m_config) { printf("Create RdKafka Conf failed.\n"); break; } // 必要参数1:指定 broker 地址列表 errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 必要参数2:设置消费者组 id errorCode = m_config->set("group.id", m_groupID, errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(group.id) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置事件回调 m_event_cb = new ConsumerEventCb; errorCode = m_config->set("event_cb", m_event_cb, errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(event_cb) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置消费者组再平衡回调 m_rebalance_cb = new ConsumerRebalanceCb; errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(rebalance_cb) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件 errorCode = m_config->set("enable.partition.eof", "false", errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 每次最大拉取的数据大小 errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 设置分区分配策略:range、roundrobin、cooperative-sticky errorCode = m_config->set("partition.assignment.strategy", "range", errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 心跳探活超时时间---1s errorCode = m_config->set("session.timeout.ms", "6000", errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 心跳保活间隔 errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 1.2、创建 topic conf 对象 m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if (nullptr == m_topicConfig) { printf("Create RdKafka Topic Conf failed.\n"); break; } // 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费 errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 默认 topic 配置,用于自动订阅 topics errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr); if (RdKafka::Conf::CONF_OK != errorCode) { printf("Conf set(default_topic_conf) failed, errorStr:%s.\n", errorStr.c_str()); break; } // 2、创建 Consumer 对象 m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr); if (nullptr == m_consumer) { printf("Create KafkaConsumer failed, errorStr:%s.\n", errorStr.c_str()); break; } printf("Created consumer success, consumerName:%s.\n", m_consumer->name().c_str()); return 0; } while (0); Destroy(); return -1; } void CKafkaConsumer::Destroy() { m_running = false; if (m_thread.joinable()) m_thread.join(); if(nullptr != m_consumer) m_consumer->close(); if(nullptr != m_config) { delete m_config; m_config = nullptr; } if(nullptr != m_topicConfig) { delete m_topicConfig; m_topicConfig = nullptr; } if(nullptr != m_consumer) { delete m_consumer; m_consumer = nullptr; } if(nullptr != m_event_cb) { delete m_event_cb; m_event_cb = nullptr; } if(nullptr != m_rebalance_cb) { delete m_rebalance_cb; m_rebalance_cb = nullptr; } } void CKafkaConsumer::PullMessage() { m_thread = std::thread(ConsumerWorker, this); } void CKafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque) { switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: // 超时 break; case RdKafka::ERR_NO_ERROR: // 有消息进来 printf("Recv Message. topic:%s, partition:[%d], key:%s, payload:%s\n", msg->topic_name().c_str(), msg->partition(), msg->key()->c_str(), (char *)msg->payload()); break; default: break; } } void CKafkaConsumer::OnRecv() { if(nullptr == m_consumer) return; // 后续可扩展 std::vector<std::string> topicVector; { RecursiveGuard mtx(mutex_); topicVector = m_topicVector; } // 1、订阅主题 RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicVector); if (RdKafka::ERR_NO_ERROR != errorCode) { printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str()); return; } // 2、拉取并消费消息 while (m_running) { RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时 if(nullptr != msg) { // 消费消息 ConsumeMsg_(msg, nullptr); delete msg; msg = nullptr; } } // 同步提交,Consumer 关闭前调用,等待 broker 返回读取消息 if(nullptr != m_consumer) m_consumer->commitSync(); }
#include "KafkaConsumer.h" #include <memory> int main() { std::string brokers = ""; std::string groupID = "test"; std::vector<std::string> topics; topics.push_back("first-topic-test"); auto comsumer = std::make_shared<CKafkaConsumer>(brokers, groupID, topics, 1000); if(!comsumer.get()) return -1; if(0 != comsumer->Create()) return -1; comsumer->PullMessage(); system("pause"); return 0; }
