赞
踩
前言:本章分为KafKa重点概述、Java下的KafKa、SpringBoot整合KafKa三大块。
后期还会加内容。
轮询策略:就是按照顺序轮流分配到每个分区中
随机策略:每次随机进行分配
按key保存策略:指定key,计算这个key的hash值,按照hash值对不同信息进行分配
其中在追随者中副本中又分为ISR和普通副本
小结:因为ISR为同步,普通为异步。
所以在数据高一致性上选择ISR,在高可用上选择普通。
在配置中有几个重要常用的配置。
--发送消息时可指定同步和异步 推荐异步
--重传机制
--ack确认机制
--压缩发送
--消息有序性
--提交偏移量
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency>
在配置KafKa中,在ProducerConfig中有很多静态常量,根据自己的要求,可以对Properties进行设置。
1.生产者类
这里介绍最简单的发送消息设置,总共四个步骤
1.配置Kafka信息
2.创建生产者对象并封装要发送的数据
3.发送消息
4.关闭生产通道
- // 1.kafka的配置信息
- Properties properties = new Properties();
- //kafka的连接地址
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"替换成你kafka的ip地址:9092");
- //消息key的序列化器
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
- //消息value的序列化器
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
-
- // 2.创建kafka生产者对象
- KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
- // 封装发送的消息
- ProducerRecord<String, String> record = new ProducerRecord<>("dalang-topic", "10001", "hello kangan");
-
- // 3. 发送消息
- kafkaProducer.send(record);
-
- // 4.关闭消息通道
- kafkaProducer.close();
一些比较常用的生产者配置
-同步和异步发送消息 在send加个参数new rollback(){}
-重传机制 常量为:ProducerConfig.RETRIES_CONFIG
-ack确认机制 常量为:ProducerConfig.ACKS_CONFIG
-压缩发送 常量为:ProducerConfig.COMPRESSION_TYPE_CONFIG
2.消费者类
这里介绍最简单的消费消息设置,总共四个步骤
1.配置KafKa
2.创建生产者对象
3.订阅要拉取的主题
4.设置监听并消费
- // 1. 配置kafka
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "替换成你Kafka所在的ip地址:9092");
-
- // 2.生产者对象
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
-
- // 3.订阅主题
- kafkaConsumer.subscribe(Collections.singletonList("dalang-topic"));
-
- //当前线程一直处于监听状态
- while (true) {
- //4.获取消息
- ConsumerRecords<String, String> consumerRecords=consumer.poll(Duration.ofMillis(1000));
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- System.out.println(consumerRecord.key());
- System.out.println(consumerRecord.value());
- }
- }
-提交偏移量 指定常量ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG设置为false
同步提交 消费者对象.commitSync.();
异步提交 消费者对象.commitAsync.(new OffsetCommitCallback() {});
同步异步提交 结合同步和异步
配置文件加入以下内容:
spring:
kafka:
bootstrap-servers: 填写你的KafKa的Ip地址:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
1.1自动注入 KafkaTemplate
1.2发送消息 send();
这里也是举简单例子,测试实体类为User
- @Data
- @ToString
- public class User {
- private String userName;
- private Integer age;
- }
- @RestController
- public class HelloKangAnController {
-
- @Autowired
- private KafkaTemplate<String,String> kafkaTemplate;
-
- /**
- * 发送消息
- * @return
- */
- @GetMapping("/hello")
- public String hello(){
- // kafkaTemplate.send("kangan-topic","早知道信息差新闻test1");
- User user = new User();
- user.setUserName("kangan");
- user.setAge(23);
- //设置了字符串反序列化 需要将对象转换成字符串
- kafkaTemplate.send("user-topic", JSON.toJSONString(user));
- return "ok";
- }
- }
配置文件:
spring:
kafka:
bootstrap-servers: 填写你的KafKa的Ip地址:9092
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.1自动注入 KafkaTemplate
2.2在需要监听拉取的方法上加入注解 @KafkaListener 指定你要拉取的主题
- @Component
- public class HelloListener {
-
- /**
- * 监听消息
- * @param message
- */
- @KafkaListener(topics = "user-topic")
- public void onMessage(String message){
- User user = JSON.parseObject(message, User.class);
-
- if(!StringUtils.isEmpty(message)){
- System.out.println(user);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。