赞
踩
Kafka传 统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义 : Kafka是 一个开源的 分 布式事件流平台 (Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
消息队列 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。
缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
• 消费者主动拉取数据,消息收到后清除消息
可以有多个topic主题(浏览、点赞、收藏、评论等)
消费者消费数据之后,不删除数据
每个消费者相互独立,都可以消费到数据
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
一台 Kafka 服务器就是一个 broker。
一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader (主)和若干个Follower(从主发生故障时变为主)。
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
–create | 创建主题。 |
–delete | 删除主题。 |
–alter | 修改主题。 |
–list | 查看所有主题。 |
–describe | 查看主题详细描述。 |
–partitions <Integer: # of partitions> | 设置分区数。 |
–replication-factor<Integer: replication factor> | 设置分区副本。 |
–config <String: name=value> | 更新系统默认的配置。 |
bin/kafka-topics.sh
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor
3 --topic first
–topic 定义 topic 名
–replication-factor 定义副本数
–partitions 定义分区数
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
修改分区数(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
删除 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
bin/kafka-console-producer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
–from-beginning | 从头开始消费。 |
–group <String: consumer group id> | 指定消费者组名称。 |
消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
package com.example.kafka2; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import java.util.Properties; /** * @Author: lx * @CreateTime: 2023-03-06 11:40 * @Description: TODO */ public class ProducerConsumer { public static void main(String[] args) { Properties properties = new Properties(); // 设置 bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.41.90.173:9092"); // 设置 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 1.创建kafka生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); // 2.发送数据 for (int i = 0; i < 5; i++) { producer.send(new ProducerRecord<String, String>("second", "my-key", "my-value")); } // 3.关闭资源 producer.close(); } /** * @description: 异步有回调函数 * @author: lmk * @date: 2023/3/6 13:32 * @param: [] * @return: void **/ @Test public void test() { Properties properties = new Properties(); // 设置 bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.41.90.173:9092"); // 设置 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 根据传递的value 使用自定义分区 // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.kafka2.MyPartitioner"); // 1.创建kafka生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); // 2.发送数据 for (int i = 0; i < 5; i++) { // //同步发送 producer.send().get(); producer.send(new ProducerRecord<String, String>("first", "my-key", "my-value"), new Callback() { /** * @date: 2023/3/6 13:34 * @param: [recordMetadata 元数据信息, e 为 null,说明消息发送成功] * @return: void **/ public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("success"); }else { System.out.println("failure"+e); } //first 0:元数据信息 System.out.println(recordMetadata.topic()+" "+recordMetadata.partition()+":元数据信息"); } }); } // 3.关闭资源 producer.close(); } }
消息发送失败会自动重试,不需要我们在回调函数中手动重试。
好处
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
提高并行度
生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
生产者发送消息的分区策略
kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() {
producer.send(new ProducerRecord<String, String>("first", "my-key", "my-value"),
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.lx.kafka.producer.MyPartitioner");
package com.example.kafka2; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * @Author: lx * @CreateTime: 2023-03-06 13:58 * @Description: TODO 自定义kafka分区 使其发送到指定分区 */ public class MyPartitioner implements Partitioner { /** * @description: * @author: lx * @date: 2023/3/6 13:59 * @param: [s topic, o key, keybytes, o1 value, value bytes1, cluster] * @return: int 返回几号分区 **/ @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { //获取数据 String value = o.toString(); if (value.contains("lx_yyds")) { return 0; }else if(value.contains("lx_123")){ return 1; }else { return 2; } } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
• batch.size:批次大小,默认16k
• linger.ms:等待时间,修改为5-100ms
• compression.type:压缩snappy
• RecordAccumulator:缓冲区大小,修改为64m
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
ack 应答
讨论 leader挂 follower挂(follow一直同步不上)
分区副本 即leader和follow 所以至少有俩副本
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2,保证数据不丢失
最多一次(At Most Once)= ACK级别设置为0 保证数据不重复
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条。
保证的是在单分区单会话内不重复
PID是Kafka每次重启都会分配一个新的且自增
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭
开启事务,必须开启幂等性。
package com.example.kafka2; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * @Author: lx * @CreateTime: 2023-03-28 15:56 * @Description: TODO */ public class CustomProducerTransactions { public static void main(String[] args) throws InterruptedException { // 1. 创建 kafka 生产者的配置对象 Properties properties = new Properties(); // 2. 给 kafka 配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置事务 id(必须),事务 id 任意起名 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); // 3. 创建 kafka 生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 初始化事务 kafkaProducer.initTransactions(); // 开启事务 kafkaProducer.beginTransaction(); try { // 4. 调用 send 方法,发送消息 for (int i = 0; i < 5; i++) { // 发送消息 kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)); } // int i = 1 / 0; // 提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { // 终止事务 kafkaProducer.abortTransaction(); } finally { // 5. 关闭资源 kafkaProducer.close(); } } }
消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
即消费者组是逻辑上的一个订阅者
在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
auto.offset.reset | 没有初始偏移量时 earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数,默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。 |
package com.example.kafka2; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @Author: lx * @CreateTime: 2023-03-06 15:50 * @Description: TODO */ public class CustomerConsumer { public static void main(String[] args) { Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.41.90.173:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); // 配置消费者 组id properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 1.创建消费者 KafkaConsumer<String, String> co = new KafkaConsumer<>(properties); // 2.订阅主题 List<String> topics =new ArrayList<>(); // 定义指定分区 // List<TopicPartition> topics =new ArrayList<>(); // topics.add(new TopicPartition("second",1)); // co.assign(topics); topics.add("first"); co.subscribe(topics); // 3.消费数据 // while(true){ // 1秒钟拉取的数据 ConsumerRecords<String, String> poll = co.poll(Duration.ofSeconds(1000)); poll.forEach(System.out::println); // } // } }
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);
一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据?
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
partitions/consumer 有余数前几个消费者会多消费。
注:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic。
分区数可以增加,但是不能减少。
原理
当一N个消费者挂掉 则会被踢出且其数据会平均分配到其它消费者里面
原理
先是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
当一N个消费者挂掉,则会重新按照RoundRobin策略进行消费
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
N个消费者的任务会按照粘性规则,尽可能均衡的随机分成N分给其它消费者消费。
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
enable.auto.commit:是否开启自动提交offset功能,默认是true
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
不建议使用。
手动提交
• commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
• commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
// 同步提交 offset
consumer.commitSync();
// 异步提交 offset
consumer.commitAsync();
异步方式使用较多。
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
Set<TopicPartition> assignment= new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
for (TopicPartition tp: assignment) {
kafkaConsumer.seek(tp, 1700);
}
Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费) assignment = kafkaConsumer.assignment(); } HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>(); // 封装集合存储,每个分区对应一天前的数据 for (TopicPartition topicPartition : assignment) { timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } // 获取从 1 天前开始消费的每个分区的 offset Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); // 遍历每个分区,对每个分区设置消费时间。 for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); // 根据时间指定开始消费的位置 if (offsetAndTimestamp != null){ kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
重复消费:已经消费了数据,但是 offset 没提交。 即消费了时间没到还没提交就挂了。
漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值
(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条
SpringBoot可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>top.remained</groupId> <artifactId>kafka3</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka3</name> <description>kafka3</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.6.13</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <repositories> <repository> <id>aliyun-releases</id> <url>https://maven.aliyun.com/repository/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
# 应用服务 WEB 访问端口 server.port=8080 # 应用名称 spring.application.name=springboot_kafka #springboot 生产者 # 指定 kafka 的地址 spring.kafka.bootstrap-servers=127.0.0.1:9092 #指定 key 和 value 的序列化器 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 指定 key 和 value 的反序列化器 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #指定消费者组的 group_id spring.kafka.consumer.group-id=group
从浏览器端接收到数据并发送给消费者
@RestController
public class ProducerController {
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
// 从浏览器中获取数据并发送到消费者
@RequestMapping("/producer")
public String getProducer(String producer) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> first = kafkaTemplate.send("first", producer);
System.out.println(first.get());
return "OK";
}
}
@Configuration
public class KafkaConsumer {
@KafkaListener(topics = "first")
public void consumeTopic(String msg){
System.out.println(msg);
}
}
说明:
本文参考尚硅谷的文档,有兴趣的可以去观看(挺不错的)。
随谈
管他那么多干嘛,上天安排的最大。
最近明白了一个问题,特爱走小路的我发现走小路有一个极端问题,不仅路不好走而且小路很多时候就会走着走着没路了。有大路时,别人劝你走大路时,千万要““猥琐发育,别浪””,因为你走小路想回头却已迷失了方向。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。