当前位置:   article > 正文

kafka实现异步发送_Kafka producer端开发代码实例

kafka异步等待发送成功

一、producer工作流程

producer使用用户启动producer的线程,将待发送的消息封装到一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而producer的另外一个线程(Sender线程)则负责实时从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker,具体流程如下图:

b3a5f895529b77f48328cad35bf2e2ad.png

二、producer示例程序开发

首先引入kafka相关依赖,在pom.xml文件中加入如下依赖:

org.apache.kafka

kafka_2.12

2.2.0

在resources下面创建kafka-producer.properties配置文件,用于设置kafka参数,内容如下:

bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=org.apache.kafka.common.serialization.StringSerializer

acks=-1

retries=3

batch.size=323840

linger.ms=10

buffer.memory=33554432

max.block.ms=3000

其中,前三个参数必须明确指定,因为这三个参数没有默认值(注:kafka的producer参数配置可以参考http://kafka.apache.org/documentation/),然后编写producer发送消息的代码:

/**

* Kafka发送消息测试

* @throws IOException

*/

public void sendMsg() throws IOException {

//1.构造properties对象

Properties properties = new Properties();

FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties");

properties.load(fileInputStream);

fileInputStream.close();

//2.构造kafkaProducer对象

KafkaProducer producer = new KafkaProducer(properties);

for (int i = 0; i < 100; i+

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/707530
推荐阅读
相关标签
  

闽ICP备14008679号