当前位置:   article > 正文

windows下kafka的环境配置及rdkafka库的应用_windows下安装的librdkafkacpp rdkafka -v

windows下安装的librdkafkacpp rdkafka -v

参考:
https://blog.csdn.net/woshixiazaizhe/article/details/80610432

1、下载并安装Win32OpenSSL-1_0_2s
2、从github上下载代码:
https://github.com/edenhill/librdkafka/tree/v0.11.6-RC4,解压并用vs2017打开win32/librdkafka.vcxproj工程文件
3、选中librdkafka工程,右键属性-Platform Toolset,改为Visual Stdio 2015(v140)
4、找到regexp.c文件,在开始部分添加代码:

#if _MSC_VER>=1900
#include "stdio.h" 
_ACRTIMP_ALT FILE* __cdecl __acrt_iob_func(unsigned);
#ifdef __cplusplus 
extern "C"
#endif 
FILE* __cdecl __iob_func(unsigned i) {
	return __acrt_iob_func(i);
}
#endif /* _MSC_VER>=1900 */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

增加legacy_stdio_definitions.lib
5、编译代码
6、选中librdkafkacpp工程后,照步骤4修改平台工具集为Visual Stdio 2015(v140)
7、编译librdkafkacpp代码,此时rdfkafka已经算编译成功了。
8、将rdkafka.h和rdkafkacpp.h移动到新建的inc目录,将librdkafka.lib,librdkafkacpp.lib移动到lib文件夹下,为下面的c++ kafka应用作准备。
9、新建rdkafka的producer工程,写producer相关代码:

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>

#include <getopt.h>

#include "rdkafkacpp.h"

static bool run = true;

static void sigterm(int sig) {
	run = false;
}

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
	void dr_cb(RdKafka::Message &message) {
		std::cout << "Message delivery for (" << message.len() << " bytes): " <<
			message.errstr() << std::endl;
		if (message.key())
			std::cout << "Key: " << *(message.key()) << ";" << std::endl;
	}
};

class ExampleEventCb : public RdKafka::EventCb {
public:
	void event_cb(RdKafka::Event &event) {
		switch (event.type())
		{
		case RdKafka::Event::EVENT_ERROR:
			std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
				event.str() << std::endl;
			if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
				run = false;
			break;

		case RdKafka::Event::EVENT_STATS:
			std::cerr << "\"STATS\": " << event.str() << std::endl;
			break;

		case RdKafka::Event::EVENT_LOG:
			fprintf(stderr, "LOG-%i-%s: %s\n",
				event.severity(), event.fac().c_str(), event.str().c_str());
			break;

		default:
			std::cerr << "EVENT " << event.type() <<
				" (" << RdKafka::err2str(event.err()) << "): " <<
				event.str() << std::endl;
			break;
		}
	}
};

int main()
{
	std::string brokers = "localhost";
	std::string errstr;
	std::string topic_str = "test";
	int32_t partition = RdKafka::Topic::PARTITION_UA;

	RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

	conf->set("bootstrap.servers", brokers, errstr);

	ExampleEventCb ex_event_cb;
	conf->set("event_cb", &ex_event_cb, errstr);

	signal(SIGINT, sigterm);
	signal(SIGTERM, sigterm);

	ExampleDeliveryReportCb ex_dr_cb;
	conf->set("dr_cb", &ex_dr_cb, errstr);

	RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
	if (!producer) {
		std::cerr << "Failed to create producer: " << errstr << std::endl;
		exit(1);
	}
	std::cout << "% Created producer " << producer->name() << std::endl;

	RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
		tconf, errstr);
	if (!topic) {
		std::cerr << "Failed to create topic: " << errstr << std::endl;
		exit(1);
	}

	for (std::string line; run && std::getline(std::cin, line);) {
		if (line.empty()) {
			producer->poll(0);
			continue;
		}

		RdKafka::ErrorCode resp =
			producer->produce(topic, partition,
				RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
				const_cast<char *>(line.c_str()), line.size(),
				NULL, NULL);
		if (resp != RdKafka::ERR_NO_ERROR)
			std::cerr << "% Produce failed: " <<
			RdKafka::err2str(resp) << std::endl;
		else
			std::cerr << "% Produced message (" << line.size() << " bytes)" <<
			std::endl;

		producer->poll(0);
	}

	run = true;
	// 退出前处理完输出队列中的消息
	while (run && producer->outq_len() > 0) {
		std::cerr << "Waiting for " << producer->outq_len() << std::endl;
		producer->poll(1000);
	}

	delete conf;
	delete tconf;
	delete topic;
	delete producer;

	RdKafka::wait_destroyed(5000);

	return 0;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130

完成后将步骤6的inc和lib目录添加进工程配置里面,然后编译代码。(如果报找不到librdkafka.dll或者librdkafkacpp.dll,就将对应的dll拷贝到Debug\Release的生成文件目录中。)
8、下载jdk,配置java环境变量JAVA_HOME及PATH
9、下载zookeeper-3.4.14.tar,解压后复制zoo_sample.cfg重命名成zoo.cfg,修改编辑zoo.cfg,修改dataDir为【dataDir=/data】
10、添加环境变量

   ZOOKEEPER_HOME          D:\Program Files\zookeeper-3.5.2-alpha
   Path 在现有的值后面添加     ;%ZOOKEEPER_HOME%\bin;
  • 1
  • 2

11、运行zookeeper:zkserver
12、下载kafka_2.12-2.3.0,解压后打开目录D:\kafka_2.12-0.11.0.0\config下server.properties文件,把log.dirs修改为【log.dirs=D:\kafka-logs】
12、运行kafka集群环境:.\bin\windows\kafka-server-start.bat .\config\server.properties
13、运行kafka的producer:.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
14、运行kafka的customer:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
15、在producer的cmd里面写消息内容,可以看到customer的cmd接收到了消息。
16、启动步骤9的rdkafka的producer程序(程序中端口设置为9092,ip设置为licalhost,topic设置为test),在启动窗口发送消息,可以看到customer的cmd里面接收到了消息。

windows下kafka安装及c++实现kafka的源码地址:
https://download.csdn.net/download/qq_23350817/11715765

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/976384
推荐阅读
相关标签
  

闽ICP备14008679号