赞
踩
拉取镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.204.129 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.204.129:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.129:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.204.129 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.204.129:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.204.129:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
-p 9092:9092 wurstmeister/kafka:2.12-2.3.1
-p 9092:9092做端口映射
在heima-leadnews-test模块中创建kafka-demo的模块
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
创建com.heima.kafka.sample包
下面两个类ConsumerQuickStart和ProducerQuickStart类
生产者:
package com.heima.kafka.sample; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.204.129:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties); /** * 第一个参数:topic 第二个参数:key 第三个参数:value */ //封装发送的消息 ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } }
消费者:
public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.129:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("topic-first")); //当前线程一直处于监听状态 while (true) { //4.获取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } }
消费者成功收到消息
同一个组下只能有一个消费者的收到消息
如果想一对多,则需要将消费者放在不同组中
ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001",0,"hello kafka");
在发送消息时可以指定分区partition
/**
* 第一个参数:topic 第二个参数:key 第三个参数:value
*/
//封装发送的消息
ProducerRecord<String,String> record = new ProducerRecord<String, String>("topic-first","key-001","hello kafka");
//3.发送消息
//producer.send(record);
//3.1 同步发送消息
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("同步发送消息结果:topic="+recordMetadata.topic()+",partition="+recordMetadata.partition()+",offset="+recordMetadata.offset());
发送结果:
同步发送消息结果:topic=topic-first,partition=0,offset=1
//3.2 异步发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
e.printStackTrace();
}else{
System.out.println("异步发送消息结果:topic="+recordMetadata.topic()+",partition="+recordMetadata.partition()+",offset="+recordMetadata.offset());
}
}
});
发送结果:
异步发送消息结果:topic=topic-first,partition=0,offset=2
手动提交
//手动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
try {
consumer.commitSync();//同步提交当前最新的偏移量
}catch (CommitFailedException e){
System.out.println("记录提交失败的异常:"+e);
}
}
}
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API :commitAsync()。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
}
}
});
}
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。
相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try { while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(); } }catch (Exception e){+ e.printStackTrace(); System.out.println("记录错误信息:"+e); }finally { try { consumer.commitSync(); }finally { consumer.close(); } }
在kafka-demo中导入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> </dependencies>
在resources下创建文件application.yml
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.204.129:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建com.heima.kafka.controller.HelloController类,负责发送消息
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/hello")
public String hello() {
kafkaTemplate.send("itcast-topic", "hello kafka");
return "success";
}
}
建com.heima.kafka.listener.HelloListener类,负责监听消息
@Component
public class HelloListener {
@KafkaListener(topics = "itcast-topic")
public void listen(String message) {
if(!StringUtils.isEmpty(message)) {
System.out.println("message = " + message);
}
}
}
@SpringBootApplication
public class KafkaAppication {
public static void main(String[] args) {
SpringApplication.run(KafkaAppication.class, args);
}
}
打开localhost:9991/hello
已经接收到消息
创建com.heima.kafka.pojo.User
@Data
public class User {
private String username;
private Integer age;
}
使用fastjson进行转换
Controller:
@GetMapping("/user")
public String user() {
User user = new User();
user.setUsername("zhangsan");
user.setAge(20);
kafkaTemplate.send("user-topic", JSON.toJSONString(user));
return "success";
}
Listener:
@KafkaListener(topics = "user-topic")
public void listenUser(String message) {
if(!StringUtils.isEmpty(message)) {
User user = JSON.parseObject(message, User.class);
System.out.println(user);
}
}
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto wmNewsDto){
return wmNewsService.downOrUp(wmNewsDto);
}
接口
ResponseResult downOrUp(WmNewsDto wmNewsDto);
实现
@Override public ResponseResult downOrUp(WmNewsDto wmNewsDto) { // 1.参数检查 if(wmNewsDto.getId()==null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章id不能为空"); } // 2.查询文章 WmNews wmNews = getById(wmNewsDto.getId()); if(wmNews == null){ return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在"); } // 3.修改文章状态 if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"只有已发布的文章才能上下架"); } if(wmNewsDto.getEnable()==null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"enable不能为空"); } wmNews.setEnable(wmNewsDto.getEnable()); updateById(wmNews); // 4.返回结果 return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
在heima-leadnews-common模块下导入kafka依赖
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
在自媒体端的nacos配置中心配置kafka的生产者,在heima-leadnews-wemedia下的配置文件中配置kafka
spring:
kafka:
bootstrap-servers: 192.168.204.129:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
创建com.heima.common.constants.mNewsMessageConstants常量类,保存kafka的topic.
public class WmNewsMessageConstants {
public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}
注入kafka
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
发送消息,通知article端修改文章配置
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){
Map<String,Object> map = new HashMap<>();
map.put("articleId",wmNews.getArticleId());
map.put("enable",dto.getEnable());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}
在article端的nacos配置中心配置kafka的消费者
spring:
kafka:
bootstrap-servers: 192.168.204.129:9092
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
因为需要修改ap_article_config,所以需要创建对应service和mapper
创捷Service,com.heima.article.service.ApArticleConfigService接口
public interface ApArticleConfigService extends IService<ApArticleConfig> {
/**
* 修改文章配置
* @param map
*/
public void updateByMap(Map map);
}
实现
@Service @Slf4j @Transactional public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService { /** * 修改文章配置 * @param map */ @Override public void updateByMap(Map map) { //0 下架 1 上架 Object enable = map.get("enable"); boolean isDown = true; if(enable.equals(1)){ isDown = false; } //修改文章配置 update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown)); } }
在article端编写监听,接收数据
@Component
@Slf4j
public class ArtilceIsDownListener {
@Autowired
private ApArticleConfigService apArticleConfigService;
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message){
if(StringUtils.isNotBlank(message)){
Map map = JSON.parseObject(message, Map.class);
apArticleConfigService.updateByMap(map);
log.info("article端文章配置修改,articleId={}",map.get("articleId"));
}
}
}
启动相应启动类
打开自媒体管理界面,准备下架这个新闻
下架该文件,发现两张表都已经修改,完美进行下架
说明我们kafka的消息传递已经成功。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。