赞
踩
废话不多说,直接上代码
为啥这样说,现在大家都想先看大妈效果,再去看逻辑
先看整体架构
先贴yml吧,这个毕竟是项目一创建就需要的
spring:
application:
admin: apache-kafka
kafka:
bootstrap-servers: 这里是你自己的kafka地址 # kafka 服务器集群地址,默认为 localhost:9092
template:
default-topic: demo #将消息发送到的默认主题,KafkaTemplate.sendDefault
listener:
type: batch #监听器类型,可选值有:SINGLE(单条消费,默认)、BATCH(批量消息)
# kafka 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 key 序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 value 序列化方式
batch-size: 16KB #默认批处理大小,如果值太小,则可能降低吞吐量,为零将完全禁用批处理,当 linger.ms=0 时,此值无效
buffer-memory: 32MB #生产者可以用来缓冲等待发送到服务器的记录的总内存大小
retries: 3 #发送失败时的重试次数,当大于零时,允许重试失败的发送。
# 在考虑请求完成之前,生产者要求领导者已收到的确认数,可选值有:-1、0、1(默认为1)
# 使用事务时,必须配置为 -1,表示领导者必须收到所有副本的确认消息。
acks: -1
properties:
#消息提交延时时间(单位毫秒),当生产者接收到消息 linger.ms 秒钟后,就会将消息提交给 kafka。
#当生产端积累的消息达到 batch-size 大小后,也会将消息提交给 kafka。
#linger.ms 默认为 0 ,表示每接收到一条消息就会立即提交给 kafka,此时 batch-size 无效。如果对实时性要求高,则建议设置为 0
linger.ms: 0
partitioner:
class: com.wmx.apachekafka.beans.MyKafkaPartitioner #kafka 自定义分区规则
transaction-id-prefix: tx_kafka.
# kafka 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 key 反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 value 反序列化方式
group-id: test-consumer-group #标识此消费者所属的消费者组的唯一字符串,这里只要你是默认安装,那就是这个,不用修改
#消费者客户端 id,在消费者组需要唯一指定。发出请求时会传递给服务器,用于服务器端日志记录
#不写时,会自动命名,比如:consumer-1、consumer-2...,原子性递增。通常不建议自定义,使用默认值即可,因为容易冲突
#client-id: wangmx1
enable-auto-commit: true #消费者的偏移量是否在后台自动提交,默认为 true
auto-commit-interval: 5000 #如果enable.auto.commit=true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为 5000
# 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,可选的值有 latest、earliest、exception、none,默认值为 latest
# latest:重置为分区中最新的 offset(消费分区中新产生的数据)、earliest:重置为分区中最小的 offset
auto-offset-reset: latest
properties:
session.timeout.ms: 180000 #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发 rebalance(重新平衡) 操作)
request.timeout.ms: 120000 #消费请求超时时间
max-poll-records: 5 #一次调用poll()时返回的最大记录数,即批量消费每次最多消费多少条消息,注意是最多,并不是必须满足数量后才消费.
自定义分区MyKafkaPartitioner:
package com.zy.apachekafka.beans;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* kafka 自定义分区规则,一旦自定义了分区规则,就不会再走 kafka 默认的分区规则
*
* @author zy
*/
public class MyKafkaPartitioner implements Partitioner {
/**
* 计算给定记录的分区,发送消息到 kafka 服务器之前,都会先走这里进行计算目标分区,即将消息发送到具体的哪个分区
*
* @param topic :主题名称
* @param key :要分区的键(如果没有键,则为null)
* @param keyBytes :要分区的序列化键(如果没有键,则为null)
* @param value :要分区的值或null,健可以有可无,值才是真正的消息内容
* @param valueBytes :要分区的序列化值或null
* @param cluster :当前集群信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 返回的整数值就是表示生产者将消息发送到的分区
// 具体的规则可以根据自身需要设置
System.out.println("发送消息:" + value);
System.out.println("指定分区为:" + 0);
return 0;
}
/**
* 在分区程序关闭时调用
*/
@Override
public void close() {
}
/**
* 使用给定的键值对配置此类
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
消费者定时器ConsumerTimer:
package com.zy.apachekafka.component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Set;
/**
* 消费者定时器——定时开关消费者消费功能
* 1、本类使用 @EnableScheduling 定时任务的方式开关消费者监听器,同理可以自己提供控制层接口,通过 http 的方式来开关。
*
* @author zy
*/
@Component
@EnableScheduling
@EnableAsync
public class ConsumerTimer {
/**
* 1、{@link KafkaListener} 注解标注的方法会被注册在 KafkaListenerEndpointRegistry 中。
* 2、{@link KafkaListenerEndpointRegistry} 在 Spring IOC 容器中已经存在,可以直接取。
*/
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
/**
* 定时启动消费者监听器
* <p>
* MessageListenerContainer getListenerContainer(String id)
* * 1、返回具有指定id的{@link MessageListenerContainer},如果不存在此类容器,则返回 null。
* * 2、这个 id 就是 @KafkaListener 注解的 id 属性值
* Set<String> getListenerContainerIds():获取所有的 KafkaListener 监听器 id
* Collection<MessageListenerContainer> getListenerContainers():获取所有的 KafkaListener 监听器容器
*/
@Scheduled(cron = "0 52 20 * * ? ")
public void startListener() {
Set<String> containerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
containerIds.stream().forEach(item -> System.out.println("KafkaListener 消费者监听器:" + item));
//boolean isRunning():检查此组件当前是否正在运行
//void start():启动此组件,如果组件已在运行,则不应引发异常,配合 stop 方法使用,
//void resume():如果暂停,在下一次轮询后恢复此容器,配合 pause 方法使用。
kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").resume();
//kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").start();
System.out.println(LocalDateTime.now() + " 启动 kafka 消费者监听器:basicConsumer");
}
/**
* 定时关闭/暂停消费者监听器
* void pause():在下次轮询之前暂停此容器,配合 resume
* void stop():以同步方式停止此组件/容器,如果组件未运行(尚未启动),则不应引发异常。配合 start 方法重新启动
*/
@Scheduled(cron = "0 50 20 * * ? ")
public void shutDownListener() {
kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").pause();
//kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").stop();
System.out.println(LocalDateTime.now() + " 暂停 kafka 消费者监听器:basicConsumer");
}
}
下面该有消费和生产消息:
消费者 · 接收消息.KafkaConsumer:
package com.zy.apachekafka.controller;
import cn.hutool.core.exceptions.ExceptionUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Kafka 消费者 · 接收消息.
* 1、topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
* 2、系统中定义了消费者(@KafkaListener)时,启动服务后,如果连不上kafka服务器则会输出大量的警告日志,但是不会报错。
* 不是每个环境都启动了kafka服务,所以当没有配置消费者组id的时候,本类不交由Spring容器初始化,不再监听消息。
*
* @author zy
*/
@Component
@ConditionalOnProperty(prefix = "spring.kafka.consumer", name = "group-id")
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
/**
* 监听指定主题上的消息,topics 属性是一个字符串数组,可以监听多个主题。
* * id :用于唯一标识此消费者监听器,不同方法上此注解的id必须唯一,不设置时,会自动生成
* * topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
*
* @param record :消息记录对象,包含消息正文、主题名、分区号、偏移量、时间戳等等
*/
@KafkaListener(id = "basicConsumer", topics = {"car-infos", "basic-info", "helloWorld", "bgt.basic.agency.frame.topic"})
public void messageListener1(ConsumerRecord<?, ?> record) {
/**
* headers:消息头信息
* offset:此记录在相应的 Kafka 分区中的位置。
* partition:记录所在的分区
* serializedKeySize:序列化的未压缩密钥的大小(以字节为单位),如果 key为 null,则返回的大小为 -1
* serializedValueSize:序列化的未压缩值(消息正文)的大小(以字节为单位,record.value().getBytes().length)。如果值为 null,则返回的大小为 -1
* timestamp:记录的时间戳
* TimestampType:记录的时间戳类型
* topic:接收此记录的主题
* value:消息内容
*/
Headers headers = record.headers();
long offset = record.offset();
int partition = record.partition();
int serializedKeySize = record.serializedKeySize();
int serializedValueSize = record.serializedValueSize();
long timestamp = record.timestamp();
TimestampType timestampType = record.timestampType();
String topic = record.topic();
Object value = record.value();
System.out.println("收到消息:");
System.out.println("\theaders=" + headers);
System.out.println("\toffset=" + offset);
System.out.println("\tpartition=" + partition);
System.out.println("\tserializedKeySize=" + serializedKeySize);
System.out.println("\tserializedValueSize=" + serializedValueSize);
System.out.println("\ttimestamp=" + timestamp);
System.out.println("\ttimestampType=" + timestampType);
System.out.println("\ttopic=" + topic);
System.out.println("\tvalue=" + value);
}
/**
* 批量消费时,必须使用 List 接收,否则会抛异常。
* 即如果配置文件配置的是批量消费(spring.kafka.listener.type=batch),则监听时必须使用 list 接收
* 反之如果配置是单条消息消费,则不能使用 list 接收,否则也会异常.
*
* @param records
*/
@KafkaListener(topics = "batch-msg")
public void messageListener2(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费返回条数,records.size()=" + records.size());
int count = 0;
for (ConsumerRecord<?, ?> record : records) {
System.out.println("\t消息" + (++count) + ":" + record.value());
}
}
/**
* 消费消息并转换。SendTo 可以标注在类上,此时对类中的所有方法有效,方法的返回值表示转发的消息内容。
*
* @param record
* @return
*/
@KafkaListener(topics = {"sendTo"})
@SendTo("car-infos")
public String messageListener3(ConsumerRecord<?, ?> record) {
System.out.println("消费单条消费并转发:" + record.value() + "," + record.timestamp());
return record.value().toString();
}
/**
* 单位一体化编码与名称更正消息监听
* 约定更正接口返回结果监听的主题为:basic.kafka.syncAgencyStatInfo.reply
*
* @param recordList
*/
@KafkaListener(topics = {"${app.kafka.topics.agency:topic3}"})
public void syncAgencyStatInfoMsgListener(List<ConsumerRecord<String, String>> recordList) {
for (ConsumerRecord<String, String> record : recordList) {
log.info("监听单位一体化编码与名称更正消息:{}", record);
try {
System.out.println("消息处理.....");
} catch (Exception e) {
log.error("单位一体化编码与名称更正消息消费失败:{}", ExceptionUtil.getMessage(e));
}
}
}
}
生产者KafkaProducer:
package com.zy.apachekafka.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* kafka 生产者 · 发送消息
*
* @author zy
*/
@RestController
public class KafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
/**
* {@link KafkaAutoConfiguration} 中会自动根据 {@link KafkaProperties} 配置属性读取配置,
* 然后将 {@link KafkaTemplate} 模板添加到 Spring 容器中,所以这里直接获取使用即可。
*/
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsg?topic=car-infos
* <p>
* 1、send(String topic, @Nullable V data):向指定主题发送消息,如果 topic 不存在,则自动创建,
* 但是创建的主题默认只有一个分区 - PartitionCount: 1、分区也没有副本 - ReplicationFactor: 1,1表示自身。
* 2、send 方法默认是异步的,主线程会直接继续向后运行,想要获取发送结果是否成功,请添加回调方法 addCallback。
* [WARN ][org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)]:[Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
* [ERROR][org.springframework.kafka.support.LoggingProducerListener.onError(LoggingProducerListener.java:76)]:Exception thrown when sending a message with key='xxx' and payload='xxx' to topic bgt.basic.agency.frame.topic:
* 3、send().get() 可以同步阻塞主线程直到获取执行结果,或者执行超时抛出异常.
* java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
* Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
*
* @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
* @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
* @return
*/
@PostMapping("kafka/sendMsg")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessage(@RequestParam String topic, @RequestBody Map<String, Object> message) {
logger.info("向指定主题发送信息,topic={},message={}", topic, message);
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
// 异步
// kafkaTemplate.send(topic, valueAsString);
// 同步:get() 获取执行结果,此时线程将阻塞,等待执行结果
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, valueAsString).get();
sendResult.toString();
message.put("sendResult", sendResult.toString());
// org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
} catch (Exception e) {
// 异步发送时子线程中的异常是不会进入这里的,只有同步发送时,主线程阻塞,发送是吧,抛出异常时,才会进入这里。
e.printStackTrace();
}
return message;
}
/**
* 向默认主题(default-topic)发送消息:http://localhost:8080/kafka/sendMsgDefault
* 默认主题由 spring.kafka.template.default-topic 选项进行配置
*
* @param message :待发送的消息,如:{"version":2,"text":"后日凌晨三点执行任务,不得有误"}
* @return
*/
@PostMapping("kafka/sendMsgDefault")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMsgDefault(@RequestBody Map<String, Object> message) {
logger.info("向默认主题发送信息,topic={},topic={}", kafkaTemplate.getDefaultTopic(), message);
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
kafkaTemplate.sendDefault(valueAsString);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return message;
}
/**
* 异步回调写法 1
* 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
* http://localhost:8080/kafka/sendMsgCallback?topic=car-infos
* 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程。
*
* @param topic :car-infos
* @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
* @return
*/
@PostMapping("kafka/sendMsgCallback")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessageCallback(@RequestParam String topic,
@RequestBody Map<String, Object> message) {
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
/**
* addCallback:添加成功或者失败的异步回调
* {@link SuccessCallback}:是发送成功回调,函数式接口,其中的方法参数为 {@link SendResult},表示发送结果
* {@link FailureCallback}:是发送失败回调,函数式接口,其中的方法参数为 Throwable,表示异常对象
*/
kafkaTemplate.send(topic, valueAsString).addCallback(success -> {
String topic2 = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
}, failure -> {
logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
logger.warn("保存到数据库中,后期再做处理.");
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
return message;
}
/**
* 异步回调写法 2
* 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
* http://localhost:8080/kafka/sendMsgCallback2?topic=helloWorld
* 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程,主线程会直接运行过去。
*
* @param topic :helloWorld
* @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
* @return
*/
@PostMapping("kafka/sendMsgCallback2")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessageCallback2(@RequestParam String topic,
@RequestBody Map<String, Object> message) {
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
/**
* ListenableFutureCallback 接口继承了 {@link SuccessCallback}、 {@link FailureCallback} 函数式接口
* 重写方法即可
*/
kafkaTemplate.send(topic, valueAsString).addCallback(
new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> success) {
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
String topic2 = success.getRecordMetadata().topic();
logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
}
@Override
public void onFailure(Throwable failure) {
logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
logger.warn("保存到数据库中,后期再做处理.");
}
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
return message;
}
/**
* 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsgTransactional1?topic=car-infos
* 与 springframework 框架的事务整合到一起,此时异常处理完全和平时一样.
*
* @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
* @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
* @return
*/
@PostMapping("kafka/sendMsgTransactional1")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessageTransactional1(@RequestParam String topic,
@RequestBody Map<String, Object> message) {
try {
logger.info("向指定主题发送信息,带事务管理,topic={},message={}", topic, message);
String msg = new ObjectMapper().writeValueAsString(message);
ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, msg);
if ("110".equals(message.get("version").toString())) {
TimeUnit.SECONDS.sleep(3);
System.out.println(1 / 0);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return message;
}
/**
* http://localhost:8080/kafka/sendMsgTransactional2?topic=car-infos
* 生成者发送消息事务管理方式2:使用 executeInTransaction(OperationsCallback<K, V, T> callback)
* executeInTransaction:表示执行本地事务,不参与全局事务(如果存在),即方法内部和外部是分离的,只要内部不
* 发生异常,消息就会发送,与外部无关,即使外部有 @Transactional 注解也不影响消息发送,此时外围有没有 @Transactional 都一样。
*
* @param topic
* @param message
* @return
*/
@PostMapping("kafka/sendMsgTransactional2")
public Map<String, Object> sendMessageTransactional2(@RequestParam String topic,
@RequestBody Map<String, Object> message) {
try {
logger.info("向指定主题发送信息,带事务管理:topic={},message={}", topic, message);
String msg = new ObjectMapper().writeValueAsString(message);
/**
* executeInTransaction 表示这些操作在本地事务中调用,不参与全局事务(如果存在)
* 所以回调方法内部发生异常时,消息不会发生出去,但是方法外部发生异常不会回滚,即便是外围方法加了 @Transactional 也没用。
*/
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, msg);
if ("120".equals(message.get("version").toString())) {
System.out.println(1 / 0);
}
return null;
});
//如果在这里发生异常,则只要 executeInTransaction 里面不发生异常,它仍旧会发生消息成功
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return message;
}
}
贴一份pom文件吧,现在随着依赖的增加,很多时候会出现依赖之间出现问题,而且还很难排错,,有一个idea插件可以安排(maven helper)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wmx</groupId>
<artifactId>apache-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>apache-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- spring 整合的 apache kafka 消息队列依赖-->
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
如果有不明白的联系作者,一起学习
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。