赞
踩
(1)配置生产者客户端参数。
(2)创建相应的生产者实例。
(3)构建待发送的消息。
(4)发送消息。
(5)关闭生产者实例。
enum ConfType{
CONF_GLOBAL, // 全局配置
CONF_TOPIC // Topic配置
};
enum ConfResult{
CONF_UNKNOWN = -2,
CONF_INVALID = -1,
CONF_OK = 0
};
static Conf * create(ConfType type);
创建配置对象。
Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。
Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
设置dr_cb属性值。
Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
设置event_cb属性值。
Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
设置用于自动订阅Topic的默认Topic配置。
Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。
Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
设置partitioner_key_pointer_cb属性值。
Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
设置socket_cb属性值。
Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
设置open_cb属性值。
Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
设置rebalance_cb属性值。
Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
设置offset_commit_cb属性值。
Conf::ConfResult get(const std::string &name, std::string &value) const;
查询单条属性配置值。
Message表示一条消费或生产的消息,或是事件。
每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。
为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。
virtual void dr_cb(Message &message)=0;
当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。
C++封装示例:
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
void dr_cb(RdKafka::Message &message)
{
if(message.err())
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
else
{
// Message delivered to topic test [0] at offset 135000
std::cerr << "Message delivered to topic " << message.topic_name()
<< " [" << message.partition() << "] at offset "
<< message.offset() << std::endl;
}
}
};
enum Type{
EVENT_ERROR, //错误条件事件
EVENT_STATS, // Json文档统计事件
EVENT_LOG, // Log消息事件
EVENT_THROTTLE // 来自Broker的throttle级信号事件
};
事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。
virtual void event_cb(Event &event)=0; // 事件回调函数
C++封装示例:
class ProducerEventCb : public RdKafka::EventCb
{
public:
void event_cb(RdKafka::Event &event)
{
switch(event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
break;
}
}
};
PartitionerCb用实现自定义分区策略,需要使用RdKafka::Conf::set()设置partitioner_cb属性。
virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt,void *msg_opaque)=0;
//Partitioner回调函数
返回topic主题中使用key的分区,key可以是NULL或字符串。
返回值必须在0到partition_cnt间,如果分区失败可能返回RD_KAFKA_PARTITION_UA (-1)。
msg_opaque与RdKafka::Producer::produce()调用提供的msg_opaque相同。
C++封装示例:
class HashPartitionerCb : public RdKafka::PartitionerCb
{
public:
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque)
{
char msg[128] = {0};
int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
// [topic][key][partition_cnt][partition_id]
// :[test][6419][2][1]
sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),
key->c_str(), partition_cnt, partition_id);
std::cout << msg << std::endl;
return partition_id;
}
private:
static inline unsigned int generate_hash(const char *str, size_t len)
{
unsigned int hash = 5381;
for (size_t i = 0 ; i < len ; i++)
hash = ((hash << 5) + hash) + str[i];
return hash;
}
};
static const int32_t PARTITION_UA = -1; //未赋值分区
static const int64_t OFFSET_BEGINNING = -2; //特殊位移,从开始消费
static const int64_t OFFSET_END = -1; //特殊位移,从末尾消费
static const int64_t OFFSET_STORED = -1000; //使用offset存储
参数 | 含义 |
---|---|
topic | 主题 |
partition | 分区 |
msgflags | 可选项为RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。RK_MSG_FREE表 示RdKafka调用produce完成后会释放payload数据;RK_MSG_COPY表示payload数据会被拷贝,在produce调用完成后RdKafka不会使用payload指针;RK_MSG_BLOCK表示在消息队列满时阻塞produce函数,如果dr_cb回调函数被使用,应用程序必须调用rd_kafka_poll函数确保投递消息队列的投递消息投递完。当消息队列满时,失败会导致produce函数的永久阻塞。RK_MSG_FREE和RK_MSG_COPY是互斥操作。如果produce函数调用时指定了RK_MSG_FREE,并返回了错误码,与payload指针相关的内存数据必须由使用者负责释放。 |
payload | 长度为len的消息负载数据 |
len | payload消息数据的长度。 |
key | key是可选的消息key,如果非NULL,会被传递给主题partitioner,并被随消息发送到Broker和传递给Consumer。 |
msg_opaque | msg_opaque是可选的应用程序提供给每条消息的opaque指针,opaque指针会在dr_cb回调函数内提供。 |
返回错误码:
错误码 | 含义 |
---|---|
ERR_NO_ERROR | 消息成功发送并入对列。 |
ERR_QUEUE_FULL | 最大消息数量达到queue.buffering.max.message。 |
ERR_MSG_SIZE_TOO_LARGE | 消息数据大小太大,超过messages.max.bytes配置的值。 |
ERR_UNKNOWN_PARTITION | 请求一个Kafka集群内的未知分区。 |
ERR_UNKNOWN_TOPIC | topic是Kafka集群的未知主题。 |
(1)指定连接 Kafka 集群所需要的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或者多个地址,中间以逗号进行隔开,此参数的默认值为 “”。
(2)注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找其他 broker 的信息。
(3)过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
// 创建Kafka Conf对象
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == NULL)
{
std::cout << "Create RdKafka Conf failed." << std::endl;
}
// 创建Topic Conf对象
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_topicConfig == NULL)
{
std::cout << "Create RdKafka Topic Conf failed." << std::endl;
}
// 设置Broker属性
RdKafka::Conf::ConfResult errCode;
m_dr_cb = new ProducerDeliveryReportCb;
std::string errorStr;
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
m_event_cb = new ProducerEventCb;
errCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
m_partitioner_cb = new HashPartitionerCb;
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("message.max.bytes", "10240000", errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
生产者的相关配置和实例的创建可以在类的构造函数实现。比如Kafka Conf对象、Topic Conf对象、设置Broker属性、Producer、Topic对象等。
// 创建Producer
m_producer = RdKafka::Producer::create(m_config, errorStr);
if(m_producer == NULL)
{
std::cout << "Create Producer failed:" << errorStr << std::endl;
}
// 创建Topic对象
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
if(m_topic == NULL)
{
std::cout << "Create Topic failed:" << errorStr << std::endl;
}
librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。
同一个生产者可以发送多个主题的,在内部处理时根据传入的topic对象发送给对应的主题分区。
RdKafka::ErrorCode errorCode = m_producer->produce(
m_topic,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
payload,
len,
&key,
NULL);
KafkaProducer.h
#ifndef KAFKAPRODUCER_H
#define KAFKAPRODUCER_H
#pragma once
#include <string>
#include <iostream>
#include "rdkafkacpp.h"
class KafkaProducer
{
public:
/**
* @brief KafkaProducer
* @param brokers
* @param topic
* @param partition
*/
explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition);
/**
* @brief push Message to Kafka
* @param str, message data
*/
void pushMessage(const std::string& str, const std::string& key);
~KafkaProducer();
private:
std::string m_brokers; // Broker列表,多个使用逗号分隔
std::string m_topicStr; // Topic名称
int m_partition; // 分区
RdKafka::Conf* m_config; // Kafka Conf对象
RdKafka::Conf* m_topicConfig; // Topic Conf对象
RdKafka::Topic* m_topic; // Topic对象
RdKafka::Producer* m_producer; // Producer对象
/*只要看到Cb 结尾的类,要继承它然后实现对应的回调函数*/
RdKafka::DeliveryReportCb* m_dr_cb;
RdKafka::EventCb* m_event_cb;
RdKafka::PartitionerCb* m_partitioner_cb;
};
#endif
KafkaProducer.cpp
#include "KafkaProducer.h"
// call back
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
void dr_cb(RdKafka::Message &message)
{
if(message.err())
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
else
{
// Message delivered to topic test [0] at offset 135000
std::cerr << "Message delivered to topic " << message.topic_name()
<< " [" << message.partition() << "] at offset "
<< message.offset() << std::endl;
}
}
};
class ProducerEventCb : public RdKafka::EventCb
{
public:
void event_cb(RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
break;
}
}
};
class HashPartitionerCb : public RdKafka::PartitionerCb
{
public:
int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque)
{
char msg[128] = { 0 };
int32_t partition_id = generate_hash(key->c_str(), key->size()) % partition_cnt;
// [topic][key][partition_cnt][partition_id]
// :[test][6419][2][1]
sprintf(msg, "HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),
key->c_str(), partition_cnt, partition_id);
std::cout << msg << std::endl;
return partition_id;
}
private:
static inline unsigned int generate_hash(const char *str, size_t len)
{
unsigned int hash = 5381;
for (size_t i = 0; i < len; i++)
hash = ((hash << 5) + hash) + str[i];
return hash;
}
};
KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
{
m_brokers = brokers;
m_topicStr = topic;
m_partition = partition;
/* 创建Kafka Conf对象 */
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config==NULL)
std::cout << "Create RdKafka Conf failed." << std::endl;
/* 创建Topic Conf对象 */
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (m_topicConfig == NULL)
std::cout << "Create RdKafka Topic Conf failed." << std::endl;
/* 设置Broker属性 */
RdKafka::Conf::ConfResult errCode;
std::string errorStr;
m_dr_cb = new ProducerDeliveryReportCb;
// 设置dr_cb属性值
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
if (errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
// 设置event_cb属性值
m_event_cb = new ProducerEventCb;
errCode = m_config->set("event_cb", m_event_cb, errorStr);
if (errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
// 自定义分区策略
m_partitioner_cb = new HashPartitionerCb;
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
if (errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
// 设置配置对象的属性值
errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
if (errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("message.max.bytes", "10240000", errorStr);
if (errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if (errCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed:" << errorStr << std::endl;
}
/* 创建Producer */
m_producer = RdKafka::Producer::create(m_config, errorStr);
if (m_producer == NULL)
{
std::cout << "Create Producer failed:" << errorStr << std::endl;
}
/* 创建Topic对象 */
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
if (m_topic == NULL)
{
std::cout << "Create Topic failed:" << errorStr << std::endl;
}
}
KafkaProducer::~KafkaProducer()
{
while (m_producer->outq_len() > 0)
{
std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
m_producer->flush(5000);
}
delete m_config;
delete m_topicConfig;
delete m_topic;
delete m_producer;
delete m_dr_cb;
delete m_event_cb;
delete m_partitioner_cb;
}
void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
{
int32_t len = str.length();
void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
RdKafka::ErrorCode errorCode = m_producer->produce(
m_topic,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
payload,
len,
&key,
NULL);
m_producer->poll(0);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
if (errorCode == RdKafka::ERR__QUEUE_FULL)
{
m_producer->poll(100);
}
}
}
CMakeLists.txt
cmake_minimum_required(VERSION 2.8)
project(KafkaProducer)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)
# Kafka头文件路径
include_directories(/usr/local/include/librdkafka)
# Kafka库路径
link_directories(/usr/local/lib)
aux_source_directory(. SOURCE)
add_executable(${PROJECT_NAME} ${SOURCE})
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
测试文件main.cpp
#include <iostream>
#include "KafkaProducer.h"
using namespace std;
int main()
{
// 创建Producer
// KafkaProducer producer("127.0.0.1:9092,192.168.2.111:9092", "test", 0);
KafkaProducer producer("127.0.0.1:9092", "test", 0);
for(int i = 0; i < 10000; i++)
{
char msg[64] = {0};
sprintf(msg, "%s%4d", "Hello RdKafka ", i);
// 生产消息
char key[8] = {0}; // 主要用来做负载均衡
sprintf(key, "%d", i);
producer.pushMessage(msg, key);
}
RdKafka::wait_destroyed(5000);
}
编译:
mkdir build
cd build
cmake ..
make
Kafka Producer使用流程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。