赞
踩
- package com.springboot.kafka.business;
-
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
-
- /**
- * 生产者
- */
- @Component
- public class KafkaProducerBusiness {
- @Resource
- private KafkaTemplate<String, Object> kafkaTemplate;
-
- public void send(String topic,String msg){
- kafkaTemplate.send(topic,msg);
- }
-
- public void sendCallback(String topic,String msg){
- kafkaTemplate.send(topic,msg).addCallback(
- success ->{
- String topics = success.getRecordMetadata().topic();
- int partition = success.getRecordMetadata().partition();
- long offset = success.getRecordMetadata().offset();
- System.out.println("topic:" + topics + " partition:" + partition + " offset:" + offset);
- },
- failure ->{
- String message = failure.getMessage();
- System.out.println(message);
- }
- );
- }
-
- }
- import com.springboot.kafka.constant.KafkaConstant;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
-
- import java.lang.reflect.Array;
-
- /**
- * 消费者
- */
- @Component
- public class KafkaConsumerBusiness {
-
- /**
- * topicGroup1
- * @param record
- */
- @KafkaListener(topics = KafkaConstant.topic,groupId = KafkaConstant.topicGroup1)
- public void getMessage1(ConsumerRecord record){
- Object values = record.value();
- System.out.println("1------------------------------------"+values);
- }
-
- /**
- * topicGroup2
- * @param record
- */
- @KafkaListener(topics = KafkaConstant.topic,groupId = KafkaConstant.topicGroup2)
- public void getMessage2(ConsumerRecord record){
- Object values = record.value();
- System.out.println("2------------------------------------"+values);
- }
-
-
-
-
-
- /**
- * 监听多个topic topic在application.yml配置
- * @param record
- */
- @KafkaListener(topics = {"#{'${spring.kafka.topics}'.split(',')}"},groupId = KafkaConstant.topicGroup)
- public void getMessage3(ConsumerRecord record){
- Object values = record.value();
- System.out.println("3------------------------------------"+values);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。