赞
踩
Kafka
是一款分布式流处理平台,它被设计用于高吞吐量、持久性、分布式的数据流处理。
Kafka 简介:
发布
订阅
的消息系统。Kafka 应用场景:
Kafka 在大规模数据流处理和实时数据传输场景中发挥着重要作用,其发布订阅模型、分区和副本机制以及异步消息传递的特性使其成为分布式系统中的重要组件。
当Docker部署Kafka集群时,需要确保安装了ZooKeeper
,因为Kafka依赖于ZooKeeper来实现集群协调与管理。ZooKeeper是一个开源的分布式协调服务,用于维护集群的状态信息、进行领导者选举以及协调分布式应用程序的工作。Kafka利用ZooKeeper来管理集群中的节点、配置信息和分区分配等关键任务,确保集群的稳定运行和可靠性。
先引入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
public class Producer { public static void main(String[] args) { // 设置Kafka生产者的配置 Properties props = new Properties(); // Kafka集群的地址 props.put("bootstrap.servers", "192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094"); // 确认模式:全部副本确认 props.put("acks", "all"); props.put("retries", 2); // 键的序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者实例 org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props); // 发送10条消息到主题 for (int i = 0; i < 10; i++) { // send异步发送 ProducerRecord参数: 注意 key value【消息是键值对形式】 producer.send(new ProducerRecord<String, String>("hac", Integer.toString(i), Integer.toString(i))); } // 关闭生产者实例 producer.close(); } }
public class Consumer { public static void main(String[] args) { // 创建消费者配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094"); // 消费者主 props.setProperty("group.id", "groupId1"); // 消费者组ID // 是否开启自动提交偏移量 props.setProperty("enable.auto.commit", "true"); // 自动提交偏移量的间隔时间 props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 创建Kafka消费者实例 consumer.subscribe(Arrays.asList("hac"));// 订阅主题 可以订阅多个主题 while (true) { // 从服务器拉取消息记录 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 遍历接收到的消息记录 for (ConsumerRecord<String, String> record : records) { // 输出消息的偏移量、键和值 System.out.println("接受到的消息: " + record.key() + ":" + record.value()); } } } }
在启动Kafka消费者之前,需要确保消费者能够连接到可用的Kafka集群
,并正确地订阅了所需的主题
。一旦消费者启动并成功订阅了主题,它将持续监听并处理来自Kafka集群的消息。在此期间,消费者将与集群保持连接,并持续从指定的主题中拉取消息进行处理。当生产者向所订阅的主题发送新消息时,消费者将立即收到这些消息,并进行相应的处理。
KafkaTemplate
和@KafkaListener
是Spring Kafka提供的两个核心组件,用于简化在Spring应用程序中与Apache Kafka集成的过程。
第一步:引入依赖
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
第二步:配置application.yml文件
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094
producer:
retries: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
consumer:
group-id: groupId1
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
第三步:使用
KafkaTemplate
:KafkaTemplate是Spring Kafka提供的一个工具类,用于简化向Kafka发送消息的过程。通过KafkaTemplate,可以方便地将消息发送到指定的Kafka主题。它封装了Kafka的Producer API,提供了一系列发送消息的方法,包括同步发送、异步发送、带回调函数的发送等。使用KafkaTemplate,你可以在Spring应用程序中轻松地发送消息到Kafka集群中。
@KafkaListener
:@KafkaListener注解用于标记一个方法,表示这个方法是一个Kafka消息监听器。通过在方法上使用@KafkaListener注解,可以让Spring容器自动创建Kafka消息监听器并订阅指定的主题,当有消息到达时,自动调用标记了@KafkaListener注解的方法进行消息处理。
生产者:
@RestController @RequestMapping(value = "/kafka") public class SendController { @Autowired private KafkaTemplate kafkaTemplate; @GetMapping(value = "/send") public String send() { String msg = "hello"; //这里写固定的测试一下 String topic = "hac"; kafkaTemplate.send(topic, msg); return "OK"; } }
消费者:
@Component
public class KafkaListenerMessage {
/***
* 监听新消息
*/
@KafkaListener(topics = "hac", groupId = "groupId1")
public void listener(ConsumerRecord<String, String> record) {
String value = record.value();
int partition = record.partition();
long offset = record.offset();
System.out.println("value:" + value + ",partition:" + partition + ",offset:" + offset);
}
}
效果:
❤觉得有用的可以留个关注❤
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。