赞
踩
可能都听说过“消息队列”,这个都是为了帮助我们可以处理数据或者暂时保存数据用的。
kafka主要是一个分布式流媒体平台,主要是用来处理消息的,追求的是高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的数据处理业务。比如我们使用的日志收集的时候(ELK)
rocketMQ是用来保存数据的消息队列,用来应用解耦或者流量削峰,比如说淘宝双11的时候用来业务削峰,当有大量交易涌入的时候,后端可能没办法立马处理,可以利用它进行缓存一下。
知识点介绍:
topic: Kafka将消息分门别类,每一类的消息称之为一个主题
producer 发布消息的对象称之为生产者
consumer 订阅消息并处理发布的消息的对象称之为消费者
broker 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
kafka安装
在使用kafka之前,要先有她所需要的环境,首先Java环境,其次需要zookeeper,所以我们要装一个zookeeper,请查看zookeeper安装
这些准备好以后,开始安装kafka
上传到服务器,解压
tar -zxvf kafka_2.13-3.0.0.tgz
修改参数,修改config下面得server.properties
启动kafka,注意:在启动kafka之前必须先启动zookeeper,因为之前说过了kafka是依赖于zookeeper的。
bin/kafka-server-start.sh config/server.properties #启动kafka
生产者详解
原理:
发送类型:
try {
//发送消息
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(recordMetadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量.
try { //异步发送消息 producer.send(record, new Callback() { //当消息发生成功调用的方法 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e!=null){ e.printStackTrace(); } System.out.println(recordMetadata.offset()); } }); } catch (Exception e) { e.printStackTrace(); }
如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。
参数详解:
消费者详解
原理
参数详解:
auto.commit.interval.ms
属性来控制提交的频率。如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
enable.auto.commit
被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms
控制,默认值是5秒。enable.auto.commit
设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"flase");//设置不自动提交偏移量 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); try{ //手动提交偏移量 consumer.commitSync();//同步提交 }catch (CommitFailedException e){ e.printStackTrace(); System.out.println("记录错误信息为:"+e); } } }
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"flase");//设置不自动提交偏移量 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } //异步手动提交偏移量 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e); } } }); }
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"flase");//设置不自动提交偏移量 try { while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } //异步发送 consumer.commitAsync(); } }catch (Exception e){ e.printStackTrace(); System.out.println("记录错误信息:"+e); }finally { try { //同步发送 consumer.commitSync(); }finally { consumer.close(); } }
上面简单介绍了kafka的生产者与消费者,下面就是正式的使用。
springboot与kafka整合
导入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.7.RELEASE</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.1</version> <exclusions> <exclusion> <artifactId>connect-json</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1</version> </dependency>
编写配置文件
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 10.199.12.155:9092 producer: retries: 0 batch-size: 16368 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test-hello-group auto-offset-reset: earliest enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
编写生产者
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
//第一个参数:topics
//第二个参数:消息内容
kafkaTemplate.send("hello-itcast","hello springboot kafka");
return "ok";
}
}
编写消费者
@Component
public class HelloListener {
@KafkaListener(topics = {"hello-itcast"})
public void receiverMessage(ConsumerRecord<?,?> record){
Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record);
if(optional.isPresent()){
Object value = record.value();
System.out.println(value);
}
}
}
然后启动项目即可。
注意点:
因为springboot整合kafka以后,由于使用的序列化器是StringSerializer,这个时候 生产者与消费者之间如果传递对象的时候,有两种方式:
第一:自定义类序列化器,但是这种通用性不强,没传递一个类型的类都要定义一个,很不方便。
第二:因为我们使用的是字符串序列化器,所以,我们可以选择先把要传递的对象转化为json字符串,接收完消息以后再将其转化为对象即可。所以还需再引入一个jar来做这个转化
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
传递对象的方式
@GetMapping("/hello2")
public String hello2(){
User user = new User();
user.setUsername("zhangsan");
user.setAge(18);
//第一个参数:topics
//第二个参数:消息内容
kafkaTemplate.send("hello-itcast", JSON.toJSONString(user));
return "ok";
}
就如上面的例子,先转化为字符串,在发送
@Component
public class UserListener {
@KafkaListener(topics = {"user-itcast"})
public void receiverMessage(ConsumerRecord<?,?> record){
Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record);
if(optional.isPresent()){
Object value = record.value();
User user = JSON.parseObject(String.valueOf(value), User.class);
System.out.println(user);
}
}
}
然后接收到后,再利用json将其转化为对应的对象即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。