当前位置:   article > 正文

SpringBoot集成Kafka(1)|(入门-实现生产者消费者)_springboot kafka生产者

springboot kafka生产者

SpringBoot集成Kafka(1)|(入门-实现生产者消费者)


章节

第一章链接: SpringBoot集成Kafka(1)|(入门-实现生产者消费者)

前言

本章节主要介绍SpringBoot项目集成afka的一些相关知识,包括集成版本、依赖、集成方式、以及简单的使用。查看需要对Springboot项目有一定的了解。

一、Kafka是什么?

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

二、集成步骤

1.依赖引入

pom依赖如下,主要列出SpringBoot依赖版本以及kafka版本,其他需要依赖自行添加

    <!-- Springboot 版本 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.7</version>
        <relativePath/>
    </parent>
    
     <!-- kafka 依赖 -->
     <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
     </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.文件配置

yml配置,此处为单机配置,集群模式uri

spring:
  kafka:
    bootstrap-servers: 192.188.50.50:9092,192.188.50.50:9093,192.188.50.50:9094
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # consumer消费者
      group-id: OakGroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

3.定义数据传输体

定义数据库集合,定义好字段,该处用了lombok表达式,如不需要可自定义set、get方法取代

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmployeeDto {
    private String id;
    private String name;
    private String no_id;

    private EmpInfoPo info;
    /**
     * 部门
     */
    private String dept;

    @Override
    public String toString() {
        return "EmployeeDto{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", no_id='" + no_id + '\'' +
                ", info=" + info +
                ", dept='" + dept + '\'' +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

EmpInfoPo 实体类

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class EmpInfoPo {
    private int age;
    @DateTimeFormat(pattern = "yyyy-MM-dd")
    private Date birth;
    private int gender;
    private String like;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4.生产者

通过KafkaTemplate对象对kafka数据库进行操作,KafkaTemplate有springboot容器创建管理,用户不需要进行配置,使用的时候注入即可,我这里以接口的方式实现。

@Api(tags = {"kafka相关接口"})
@RestController
@RequestMapping("/producer")
public class kafkaCtrl {

    @Autowired
    private KafkaTemplate kafkaTemplate;


    /***
     * 发送消息
     * topic:要发送的队列
     * msg:发送的消息
     */
    @ApiImplicitParams({@ApiImplicitParam(name = "msg", value = "员工信息")})
    @ApiOperation(value = "发送消息", notes = "发送消息")
    @PostMapping(value = "/send/{topic}")
    public String send(@PathVariable(value = "topic") String topic,
                       @RequestBody EmployeeDto msg) {
        //消息发送
        sendMsg(topic, msg);
        return "SUCCESS";
    }

    private void sendMsg(String topic1, EmployeeDto msg1) {
        for (int j = 0; j < 100; j++) {

            for (int i = 0; i < 100; i++) {

                EmpInfoPo build = EmpInfoPo.builder().age(i)
                        .birth(new Date())
                        .gender(i % 2)
                        .age(i % 60)
                        .like(i + "喜欢打篮球")
                        .build();
                EmployeeDto msg = EmployeeDto.builder()
                        .id(j + "" + i)
                        .name("员工" + j + i + "号")
                        .no_id("001")
                        .dept("第" + j + i + "部门")
                        .info(build)
                        .build();
                kafkaTemplate.send("testTop", msg.toString());
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

5.消费者

通过KafkaListener注解对kafka数据库进行操作。

@Component
public class KafkaConSumer {

    @KafkaListener(topics = {"testTop"},groupId = "OakGroup")
    public void listener(ConsumerRecord<String,String> record){
        //获取消息
        String message = record.value();
        //消息偏移量
        long offset = record.offset();
        System.out.println("读取的消息:"+message+"\n当前偏移量:"+offset);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

6.遇到的问题

发送数据报错
java.lang.ClassCastException: com.oak.kafka.rest.entity.dto.EmployeeDto cannot be cast to java.lang.String

原因:配置生产者的时候使用的是String序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
而实际使用的是Object,导致序列化失败
解决方法:消息数据添加方法
@Override
public String toString() {
   return "EmployeeDto{" +
           "id='" + id + '\'' +
           ", name='" + name + '\'' +
           ", no_id='" + no_id + '\'' +
           ", info=" + info +
           ", dept='" + dept + '\'' +
           '}';
}
发送方式
kafkaTemplate.send("testTop", msg.toString());

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

总结

以上就是SpringBoot集成kafka数据库内容,内容较简单,适合入门。后期会深入使用kafka并总结分享。(如果大家需要源码留言)

第一章链接: SpringBoot集成Kafka(1)|(入门-实现生产者消费者)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/524848
推荐阅读
相关标签
  

闽ICP备14008679号