赞
踩
Partition:为了实现扩展性,提⾼并发能⼒,⼀个⾮常⼤的 Topic 可以分布到多个 Broker(即服务器)上,⼀个 Topic 可以分为多个 Partition,同⼀个topic在不同的分区的数据是不重复的,每个 Partition 是⼀个有序的队列,其表现形式就是⼀个⼀个的⽂件夹。
Replication:每⼀个分区都有多个副本,副本的作⽤是做备胎。当主分区(Leader)故障的时候会选择⼀个备胎(Follower)上位,成为Leader。在kafka中默认副本的最⼤数量是10个,且副本的数量不能⼤于Broker的数量,follower和leader绝对是在不同的机器,同⼀机器对同⼀个分区也只可能存放⼀个副本(包括⾃⼰)
Leader:每个分区多个副本的“主”副本,⽣产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。
Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发⽣故障时,某个 Follower 还会成为新的 Leader。
以下,展示kafka中,一个主题TopicA中,三个patition在不同的broker中的分布,以及生产者和消费者是如何分配的。
kafkaConsumer.h
- #ifndef KAFKACONSUMER_H
- #define KAFKACONSUMER_H
-
- #include <string>
- #include <iostream>
- #include <vector>
- #include <stdio.h>
-
- #include "rdkafka.h"
-
- using namespace std;
-
- class KafkaConsumer
- {
- public:/**
- * @brief KafkaConsumer
- * @param brokers
- * @param groupID
- * @param topics
- * @param partition
- */
- KafkaConsumer();
- virtual ~KafkaConsumer();
-
- int InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics);
-
-
- void pullMessage();
-
- static void RebalanceCb(rd_kafka_t *rk,
- rd_kafka_resp_err_t err,
- rd_kafka_topic_partition_list_t *partitions,
- void *opaque);
-
- static void log_cb(const rd_kafka_t *rk, int level,
- const char *fac, const char *buf);
-
- static void EventErrorCb(rd_kafka_t *rk, int err,
- const char *reason,
- void *opaque);
-
- static int EventStatsCb(rd_kafka_t *rk,
- char *json,
- size_t json_len,
- void *opaque);
-
- protected:
- string m_strBrokers;
- string m_groupID;
- vector<string> m_topicVector;
- int m_partition;
-
- rd_kafka_t *m_pkafka;
- };
-
- #endif // KAFKACONSUMER_H

kafkaConsumer.c
- #include "KafkaConsumer.h"
-
- KafkaConsumer::KafkaConsumer()
- : m_pkafka(nullptr)
- {
- }
-
- int KafkaConsumer::InitCfg(const string& strBrokers, const string& strGroupID, const vector<string>& vTopics)
- {
- m_strBrokers = strBrokers;
- m_groupID = strGroupID;
- m_topicVector = vTopics;
-
- rd_kafka_conf_t *pConf = rd_kafka_conf_new();
- if(!pConf)
- {
- return -1;
- }
-
- char szErr[512] = { 0};
- if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", szErr);
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
-
- if (rd_kafka_conf_set(pConf, "group.id", m_groupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", szErr);
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
-
- if (rd_kafka_conf_set(pConf, "statistics.interval.ms", "10000", szErr, sizeof(szErr))!= RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", szErr);
- rd_kafka_conf_destroy(pConf);
- return -1;
- }
- // rd_kafka_conf_set_stats_cb(pConf, &KafkaConsumer::EventStatsCb);
- //
- // rd_kafka_conf_set_error_cb(pConf, &KafkaConsumer::EventErrorCb);
- // rd_kafka_conf_set_log_cb(pConf, &KafkaConsumer::log_cb);
-
- // rd_kafka_conf_set_rebalance_cb(pConf, &KafkaConsumer::RebalanceCb);
-
- // topic配置
- rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
- if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "earliest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%s\n", szErr);
- rd_kafka_topic_conf_destroy(pTopicConf);
- return -1;
- }
-
- // rd_kafka_conf_set_default_topic_conf(pConf, pTopicConf);
-
- m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
- if (!m_pkafka) {
- fprintf(stderr, "%% Failed to create new consumer: %s\n", szErr);
- return -1;
- }
-
- rd_kafka_poll_set_consumer(m_pkafka);
-
- /* Convert the list of topics to a format suitable for librdkafka */
- rd_kafka_topic_partition_list_t *subscription = rd_kafka_topic_partition_list_new(m_topicVector.size());
- for (int i = 0 ; i < m_topicVector.size() ; i++)
- {
- rd_kafka_topic_partition_list_add(subscription, m_topicVector[i].c_str(), RD_KAFKA_PARTITION_UA);
- }
-
- /* Subscribe to the list of topics */
- rd_kafka_resp_err_t err = rd_kafka_subscribe(m_pkafka, subscription);
- if (err) {
- fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err));
- rd_kafka_topic_partition_list_destroy(subscription);
- rd_kafka_destroy(m_pkafka);
- return -1;
- }
-
- fprintf(stderr, "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt);
- rd_kafka_topic_partition_list_destroy(subscription);
-
- return 0;
- }
-
- void KafkaConsumer::pullMessage()
- {
- rd_kafka_message_t *rkm;
- rkm = rd_kafka_consumer_poll(m_pkafka, 2000);
- if (!rkm)
- {
- return ; /* Timeout: no message within 100ms,
- * try again. This short timeout allows
- * checking for `run` at frequent intervals.
- */
- }
- /* consumer_poll() will return either a proper message
- * or a consumer error (rkm->err is set). */
- if (rkm->err) {
- /* Consumer errors are generally to be considered
- * informational as the consumer will automatically
- * try to recover from all types of errors. */
- fprintf(stderr,
- "%% Consumer error: %s\n",
- rd_kafka_message_errstr(rkm));
- rd_kafka_message_destroy(rkm);
- return;
- }
-
- /* Print the message value/payload. */
- if (rkm->payload)
- {
- printf(" Value: %.*s\n", (int)rkm->len, (const char *)rkm->payload);
- }
- rd_kafka_message_destroy(rkm);
- }
-
- void KafkaConsumer::RebalanceCb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
- {
- if(partitions)
- {
- for (int i=0; i<partitions->cnt; i++)
- {
- partitions->elems[i].offset = 0;
- }
- }
-
- rd_kafka_assign(rk, partitions);
- }
-
- #include <iostream>
- #include <stdio.h>
-
- void KafkaConsumer::log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
- {
- cout<< "< EventErrorCb >" << level << " " << fac << " " << buf <<endl;
- }
-
- void KafkaConsumer::EventErrorCb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
- {
- cout<< "< EventErrorCb >" << err << " " << reason <<endl;
- }
-
- int KafkaConsumer::EventStatsCb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
- {
- cout<< "< EventStatsCb >" << json <<endl;
- return 0;
- }
-
- KafkaConsumer::~KafkaConsumer()
- {
- rd_kafka_consumer_close(m_pkafka);
- /* Destroy the consumer */
- rd_kafka_destroy(m_pkafka);
- m_pkafka = nullptr;
- }

- #include "KafkaConsumer.h"
-
- KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
- const std::vector<std::string>& topics, int partition)
- {
- m_brokers = brokers;
- m_groupID = groupID;
- m_topicVector = topics;
- m_partition = partition;
-
- std::string errorStr;
- RdKafka::Conf::ConfResult errorCode;
- m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
-
- m_event_cb = new ConsumerEventCb;
- errorCode = m_config->set("event_cb", m_event_cb, errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
-
- m_rebalance_cb = new ConsumerRebalanceCb;
- errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
-
- errorCode = m_config->set("enable.partition.eof", "false", errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
-
- errorCode = m_config->set("group.id", m_groupID, errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
- errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
- errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
-
- // partition.assignment.strategy range,roundrobin
-
- m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
- // 获取最新的消息数据
- errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Topic Conf set failed: " << errorStr << std::endl;
- }
- errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
- if(errorCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed: " << errorStr << std::endl;
- }
- m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
- if(m_consumer == NULL)
- {
- std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
- }
- std::cout << "Created consumer " << m_consumer->name() << std::endl;
- }
-
- void msg_consume(RdKafka::Message* msg, void* opaque)
- {
- switch (msg->err())
- {
- case RdKafka::ERR__TIMED_OUT:
- std::cerr << "Consumer error: " << msg->errstr() << std::endl;
- break;
- case RdKafka::ERR_NO_ERROR: // 有消息进来
- std::cout << " Message in " << msg->topic_name() << " ["
- << msg->partition() << "] at offset " << msg->offset()
- << " key: " << msg->key() << " payload: "
- << (char*)msg->payload() << std::endl;
- break;
- default:
- std::cerr << "Consumer error: " << msg->errstr() << std::endl;
- break;
- }
- }
-
- void KafkaConsumer::pullMessage()
- {
- // 订阅Topic
- RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
- if (errorCode != RdKafka::ERR_NO_ERROR)
- {
- std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
- }
- // 消费消息
- while(true)
- {
- RdKafka::Message *msg = m_consumer->consume(1000);
- msg_consume(msg, NULL);
- delete msg;
- }
- }
-
- KafkaConsumer::~KafkaConsumer()
- {
- m_consumer->close();
- delete m_config;
- delete m_topicConfig;
- delete m_consumer;
- delete m_event_cb;
- delete m_rebalance_cb;
-
- }

- #include "KafkaProducer.h"
-
- KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition)
- {
- m_brokers = brokers;
- m_topicStr = topic;
- m_partition = partition;
- // 创建Kafka Conf对象
- m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
- if(m_config == NULL)
- {
- std::cout << "Create RdKafka Conf failed." << std::endl;
- }
- // 创建Topic Conf对象
- m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
- if(m_topicConfig == NULL)
- {
- std::cout << "Create RdKafka Topic Conf failed." << std::endl;
- }
- // 设置Broker属性
- RdKafka::Conf::ConfResult errCode;
- m_dr_cb = new ProducerDeliveryReportCb;
- std::string errorStr;
- errCode = m_config->set("dr_cb", m_dr_cb, errorStr);
- if(errCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed:" << errorStr << std::endl;
- }
- m_event_cb = new ProducerEventCb;
- errCode = m_config->set("event_cb", m_event_cb, errorStr);
- if(errCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed:" << errorStr << std::endl;
- }
-
- m_partitioner_cb = new HashPartitionerCb;
- errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);
- if(errCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed:" << errorStr << std::endl;
- }
-
- errCode = m_config->set("statistics.interval.ms", "10000", errorStr);
- if(errCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed:" << errorStr << std::endl;
- }
-
- errCode = m_config->set("message.max.bytes", "10240000", errorStr);
- if(errCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed:" << errorStr << std::endl;
- }
- errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
- if(errCode != RdKafka::Conf::CONF_OK)
- {
- std::cout << "Conf set failed:" << errorStr << std::endl;
- }
-
- // request.required.acks 0=Broker does not send any response/ack to client,
- // -1 or all=Broker will block until message is committed by all in sync replicas (ISRs)
-
- // 创建Producer
- m_producer = RdKafka::Producer::create(m_config, errorStr);
- if(m_producer == NULL)
- {
- std::cout << "Create Producer failed:" << errorStr << std::endl;
- }
- // 创建Topic对象
- m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);
- if(m_topic == NULL)
- {
- std::cout << "Create Topic failed:" << errorStr << std::endl;
- }
- }
-
- void KafkaProducer::pushMessage(const std::string& str, const std::string& key)
- {
- int32_t len = str.length();
- void* payload = const_cast<void*>(static_cast<const void*>(str.data()));
- RdKafka::ErrorCode errorCode = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA,
- RdKafka::Producer::RK_MSG_COPY,
- payload, len, &key, NULL);
- m_producer->poll(0);
- if (errorCode != RdKafka::ERR_NO_ERROR)
- {
- std::cerr << "Produce failed: " << RdKafka::err2str(errorCode) << std::endl;
- if(errorCode == RdKafka::ERR__QUEUE_FULL)
- {
- m_producer->poll(100);
- }
- }
- }
-
- KafkaProducer::~KafkaProducer()
- {
- while (m_producer->outq_len() > 0)
- {
- std::cerr << "Waiting for " << m_producer->outq_len() << std::endl;
- m_producer->flush(5000);
- }
- delete m_config;
- delete m_topicConfig;
- delete m_topic;
- delete m_producer;
- delete m_dr_cb;
- delete m_event_cb;
- delete m_partitioner_cb;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。