赞
踩
一、producer工作流程
producer使用用户启动producer的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而producer的另外一个线程(Sender线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker,具体流程如下图:
二、producer示例程序开发
首先引入kafka相关依赖,在pom.xml文件中加入如下依赖:
org.apache.kafka
kafka_2.12
2.2.0
在resources下面创建kafka-producer.properties配置文件,用于设置kafka参数,内容如下:
bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=-1
retries=3
batch.size=323840
linger.ms=10
buffer.memory=33554432
max.block.ms=3000
其中,前三个参数必须明确指定,因为这三个参数没有默认值(注:kafka的producer参数配置可以参考http://kafka.apache.org/documentation/),然后编写producer发送消息的代码:
/**
* Kafka发送消息测试
* @throws IOException
*/
public void sendMsg() throws IOException {
//1.构造properties对象
Properties properties = new Properties();
FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");
properties.load(fileInputStream);
fileInputStream.close();
//2.构造kafkaProducer对象
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 100; i+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。