当前位置:   article > 正文

java异步发送kafka消息,【kafka】使用kafka原生API发送和消费信息

java kafka同时发送两条消息给不同的消费者怎么写

1. 在使用kafka之前,你需要弄清楚的一些概念

名词个人理解topic对发送到kafka中的message分类的一个标签,message1属于callLog类,message2属于runningLong类,callLog跟runningLog就是两个topic

partition消息存储分区的概念,同一个topic下的数据会分布在不同partition中,每个partition中使用segment文件存储数据,每个partition中的数据是有序的,但是不同的partiton之间数据是无序的,若想保证消费消息的顺序,partition的数量设置为1

offsetOffset记录着下一条将要发送给Consumer的消息的序号,当前partiton中存了3条消息,消息1和2 已经被消费过了,所以offset就是2

borker可以简单理解为kafka集群,一个broker就是单节点kafka,消息都是先发送到broker中的partiton存储,再被消费者消费

所以针对上面的表格,一张关系图说明下

一个topic中多条message分别均匀存储在broker不同的partition中,同时该topic下的消息可以被多个消费者组中的消费者同时消费

ec370debc74f63452cb507e3f1df95e4.png

下面,使用kafka原生的API进行消息的发送和消费,windows上如何安装kafka本文就不涉及了,可以自行百度。

2. 命令行启动kafka

kafka依赖zookeeper,现在下载的kafka中自带zookeeper,先启动zookeeper,再启动kafka-server。 windows下kafka相关的bat命令都在bin\windows里面

启动zookeeper命令 zookeeper启动默认端口 2181

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

9b28499c514e4dd78dcd8543f646b084.png

启动kafka-server命令 默认端口 9092

.\bin\windows\kafka-server-start.bat .\config\server.properties

b31f5cfdc69e64553c51b3a448d3f373.png

还需要创建一个topic,方便后面编码时使用该topic,创建topic时可以指定partition个数

创建topic的命令

.\bin\windows\kafka-topics.bat --create --zookeeper zookeeper的地址 --replication-factor 1 --partitions partitions的个数 --topic topic的名字

创建成功如下图

f88f637d53f1937d8a2e861afd39ad6a.png

准备工作都做好了,下面就是编码实现消息的发送和消费了。

3. idea使用原生API发送和消费消息

创建一个springboot项目,结构目录如下

5d7dc35e9201bc70d947c8cc30c30129.png

kafka发送消息有三种方式

同步发送消息,不关注返回结果(发出去后就不管了)

同步发送消息,关注返回结果

异步发送消息,由回调函数判断是否发送成功

下面我们就来一个个分析下到底怎么使用kafka

第一种方式:同步发送消息,不关注返回结果(发出去后就不管了)

MyProducer类

package com.study.kafkastudy.producer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.springframework.stereotype.Component;

import java.util.Properties;

@Component

public class MyProducer {

private static KafkaProducer producer;

static{

//设置kafka启动需要的参数

Properties properties = new Properties();

//(kafka集群地址)broker地址

properties.put("bootstrap.servers","localhost:9092");

//key的序列化方式

properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

//value的序列化方式

properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(properties);

}

//发送消息不关注返回结果

public static void sendMessage(){

//ProducerRecord的三个参数,topic,发送的key,发送的value

ProducerRecord record =

new ProducerRecord<>("user-info-topic","name","路飞");

producer.send(record);

//关闭producer

producer.close();

}

public static void main(String[] args) {

sendMessage();

}

}

复制代码

MyConsumer类

MyConsumer类我写了一个while死循环,一直占用线程监听user-info-topic这个topic是否有新消息

package com.study.kafkastudy.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

public class MyConsumer {

private static KafkaConsumer consumer;

private static Properties properties;

static {

//设置kafka启动需要的参数

properties = new Properties();

//(kafka集群地址)broker地址

properties.put("bootstrap.servers","localhost:9092");

//key的反序列化方式

properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

//value的反序列化方式

properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

//消费组的id,属于哪个消费组

properties.put("group.id","consumerGroup1");

}

public static void getMessage(){

//是否自动提交

properties.put("enable.auto.commit",true);

consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Collections.singleton("user-info-topic"));

while (true){

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord record : records) {

System.out.println(String.format("消费到的结果key:%s,value:%s,partition:%s",

record.key(),

record.value(),

record.partition()));

}

}

}

public static void main(String[] args) {

getMessage();

}

}

复制代码

先依次启动zookeeper,kafka-server,myconsumer,最后启动myproduce发送消息。我们发送“路飞”,观察消费者能否收到消息

f8a5b2be0a7e7e0a5a28976a18b80856.png

从图中可以看出已经可以消费到消息了

第二种方式:同步发送消息,关注返回结果

我们修改一下Myproducer类,在其中增加一个sendMessageWithCareResult方法,在main方法中调用sendMessageWithCareResult方法

public static void sendMessageWithCareResult() throws ExecutionException, InterruptedException {

//ProducerRecord的三个参数,topic,发送的key,发送的value

ProducerRecord record =

new ProducerRecord<>("user-info-topic","name","路飞");

//send的返回值是一个Future对象,可以通过get方法取出其中存储的发送消息的一些元信息RecordMetadata

Future sendResult = producer.send(record);

RecordMetadata recordMetadata = sendResult.get();

//打印下发送消息的topic,partition,offset

System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",

recordMetadata.topic(),

recordMetadata.partition(),

recordMetadata.offset()));

producer.close();

}

复制代码

发送结果:

ff25a94b202a04b2dd4e09c0f89231d4.png

第三种方式:异步发送,使用回调函数监控发送结果

异步发送不用阻塞当前线程,所以速度上比同步要快很多

kafka要求异步发送消息需要在send时指定一个callBack回调对象,我们新建一个类去实现CallBack接口,实现其中的onCompletio方法,这个方法有两个参数,一个是发送消息的元信息,一个是发送出现的异常,有异常就说明发送有问题,我们再新建一个sendMessageCallBack方法

public static void sendMessageCallBack(){

ProducerRecord record =

new ProducerRecord<>("user-info-topic","name","路飞");

producer.send(record,new MyCallBack());

producer.close();

}

private static class MyCallBack implements Callback {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

System.out.println("进入异步发送回调函数");

if(exception != null){

System.out.println("出现异常:"+exception.getMessage());

}

System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s",

metadata.topic(),

metadata.partition(),

metadata.offset()));

}

}

复制代码

结果:

396cc7399999321b6e0ea943c0cbaeb3.png

现在使用kafka原生API发送消息基本都已经结束了,还有一些高级一点的用法像自定义消息的分区策略,我会抽时间更新在文章下面,大家敬请期待 2019/11/29 21:33 =============================分割线========================================

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/575271
推荐阅读
相关标签
  

闽ICP备14008679号