赞
踩
写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!
先大体说一下本篇要完成的事:
直在此之前,首先要安装并配置好zookeeper和kafka,并了解基本的kafka命令,可以参考之前的文章,有详细的安装配置步骤 Linux系统安装kafka步骤 Linux系统安装zookeeper步骤 linux系统kafka常用命令
直接贴代码记录一下:
引入依赖:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
发送消息我们使用自定义的KafkaTemplate,当然你可以直接使用默认的,自定义是为了以后方便添加自定义的分区策略啥的,贴下代码:
- @Configuration
- @EnableKafka
- public class KafkaProducerConfig {
-
- @Value("${kafka.producer.servers}")
- private String servers;
- @Value("${kafka.producer.retries}")
- private int retries;
-
- public Map<String,Object> producerConfigs(){
- Map<String,Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- props.put(ProducerConfig.RETRIES_CONFIG,retries);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return props;
- }
-
- public ProducerFactory<String,String> producerFactory(){
- return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- @Bean
- public KafkaTemplate<String,String> kafkaTemplate(){
- return new KafkaTemplate<>(producerFactory());
- }
- }
yml文件中配置kafka服务的连接地址:
- spring:
- kafka:
- bootstrap-servers: your ip:9092
- consumer:
- group-id: test-consumer-group
-
- kafka:
- producer:
- servers: your ip:9092
- retries: 3
对于上面的配置,其实有一些不太对的地方,上面bootstrap-servers已经配置了kafka的地址,下面我又单独定义了生产者的地址,有点重复,但是这是为了自定义kafkatemplate时使用,以后边学边优化。
在kafka服务器创建一个topic:mytopic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --relication-factor 1 --partitions 5 --topic mytopic
创建一个消息发送类向mytopic发送消息:
- @RestController
- public class SendMessageController {
-
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- private static final String TOPIC = "mytopic";
-
- @GetMapping("/send")
- public String send(String message) {
- kafkaTemplate.send(TOPIC, message);
- return "success";
- }
-
-
- }
在kafka服务器创建一个消费者消费mytopic的消息:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic
启动项目,访问http://localhost:8080/send?message=test 可以看到kafak服务端的消费者已经接收到了发送的消息test
kafka消费者接收到了消息,说明咱们的环境是没问题的了,接下来,在项目创建一个消息监听类监听发送到mytopic上的消息,接收到并打印到控制台:
- @Component
- public class MessageListener {
-
- @KafkaListener(topics = {"mytopic"})
- public void listener1(String message){
- System.out.println("接收到的消息:" + message);
- }
-
- }
需要指定一个分组,可以在@KafkaListener注解之上,也可以在yml文件中进行配置,不配置会报错,本篇中配置在了yml中,此时再次访问http://localhost:8080/send?message=test 可以看到kafka服务端的消费者打印了消息,并且项目的控制台也打印了消息:“接收到的消息:test”
上面我们只是接收到了消息,而通常的情况是,我接收到消息通知之后,要根据收到的消息进行下一步的处理,例如:接收到通知之后,给某个人发一条短信,或者其它处理,不管进行什么处理,我都需要知道发送的这条消息是成功了还是失败了,成功失败对应的处理肯定也不一样,其实KafkaTemplate为我们提供了一个带回调的方法addCallback,可以在回调方法中监控消息是发送成功还是失败,失败时做补偿处理,贴一下代码:
- @GetMapping("/send1")
- public String send1(String message) {
- kafkaTemplate.send(topic, message).addCallback(
- success ->{
- String topic = success.getRecordMetadata().topic();
- int partition = success.getRecordMetadata().partition();
- long offset = success.getRecordMetadata().offset();
- System.out.println("topic:" + topic + " partition:" + partition + " offset:" + offset);
- },
- failure ->{
- String message1 = failure.getMessage();
- System.out.println(message1);
- }
- );
- return "success";
- }
访问http://localhost:8080/send1?message=test1 只贴控制台的截图了之后,kafka消费者的截图就不贴了,控制台打印如下:
addCallback有两个重载的方法,除了上面的写法,你还可以像下面这样:
- @GetMapping("/send2")
- public String send2(String message){
- kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
- @Override
- public void onFailure(@NonNull Throwable throwable) {
- System.out.println(throwable.getMessage());
- }
-
- @Override
- public void onSuccess(SendResult<String, String> result) {
- System.out.println("topic:" + result.getRecordMetadata().topic() );
- }
- });
- return "success";
- }
咱们在创建topic的时候指定了5个分区,那么怎么将消息发送到指定分区,怎么监听指定分区的消息呢?这篇先说一下监听指定分区消息,@KafkaListener注解有个属性topicPartitions可以指定要监听的topic分区,代码如下:
- @KafkaListener(topics = {"mytopic"},topicPartitions = {@TopicPartition(topic = "mytopic",partitions = {"1","3"})})
- public void test1(String message){
- System.out.println("接收到的消息:" + message);
- }
访问http://localhost:8080/send1?message=test1 多次,打印结果如下,只有消息发送到分区1,3的时候监听类中才打印消息。
前面我们在发送消息的时候使用了带回调的方法,可以判断消息是否发送成功,但是监听的时候只是接收消息,并不知道更详细的信息,如partition、offset等,可以使用ConsumerRecord参数作为监听方法的参数,代码如下:
- @KafkaListener(topics = {"test5"})
- public void consumer(ConsumerRecord<?,?> consumerRecord){
- //判断是否为null
- Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
- if(kafkaMessage.isPresent()){
- //得到Optional实例中的值
- Object message = kafkaMessage.get();
- System.err.println("接收到的消息:"+message);
- }
- }
consumerRecord中包含的信息由很多,截个图看下:
最后记录一下消息的同步发送,默认情况下 KafkaTemplate 发送消息是采取异步方式,如果希望同步发送消息只需要在 send 方法后面调用 get 方法即可,get 方法返回的即为结果(如果发送失败则抛出异常),测试代码如下:
- @GetMapping("/send7")
- public String send7(String message) {
- try {
- SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
- System.out.println("消息发送到的分区:" + result.getRecordMetadata().topic());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- System.out.println("发送消息失败");
- }
- return "success";
- }
get 方法还有一个重载方法 get(long timeout, TimeUnit unit),当 send 方法耗时大于 get 方法所设定的参数时会抛出一个超时异常,不贴测试代码了。
我有几个个疑问:
以上几个问题希望知道的的小伙伴帮忙解答下,谢谢!!! 看到但不知道的小伙伴欢迎关注我,在以后的学习中我会慢慢完善,希望对您有用!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。