赞
踩
一个分布式流媒体平台,类似于消息队列或企业消息传递系统
消费者可以订阅一个或者多个的主题,并从broker中拉取数据,从而消费这些已发布的消息.
docker pull zookeeper:3.4.14
docker run -d --name zookeeper --restart=always -p 2181:2181 zookeeper:3.4.14
docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--restart=always \
--net=host wurstmeister/kafka:2.12-2.3.1
kafka的生产者消费者模型分为单播和多播
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
多个broker组成一个cluster集群
集群中的某一条机器宕机,其他机器上的broker依然能对外提供服务
消息的备份称为副本(replica)
分为两种副本
同步方式:
领导者副本直接接收发布者的消息
随后领导者副本会同步将消息复制保存到ISR follower副本,异步将消息复制保存到普通follower副本
依据以下原则选举:
选举优先从ISR中选举,因为ISR副本是同步的
如果ISR中没有生效的副本,就从普通中选举一个
可以等待ISR副本活过来保证数据完整性,也可以选举第一个活过来的保证数据可用性
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
RecordMetadata recordMetadata = producer.send(ProducerRecord).get();//获取发送的结果
传入一个callback对象处理回调结果
//异步消息发送
producer.send(ProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
System.out.println("记录异常信息到日志表中");
}
System.out.println(recordMetadata.offset());
}
});
消息确认配置acks
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
acks = 0 //生产者不会等待任何来自服务器的响应
acks = 1 //集群leader收到消息后,生产者会接受一个来自服务器的响应
acks = -1/all(默认) //参与复制的所有生产者全部收到消息后,生产者才会收到响应
重试次数retries
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
消息压缩方式
消息默认不会被压缩
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
| snappy | 占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
| lz4 | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观 |
| gzip | 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |
topic会分发消息给每个消费者组中的一个消费者
跨分区的消息无法决定先后顺序
如果需要保证消息的有序性需要让多个消费者去监听同一个分区
消费者会在消费前或者消费后向__consumer_offset的特殊topic中发送偏移量,记录每个消息的处理进度
如果消费者崩溃或者新的消费者加入就会触发负载均衡
如果使用默认方式自动提交偏移量,就会在消费前直接自动提交偏移量,可能会出现重复消费或者漏消费的情况
所以我们需要使用手动提交的方式提交偏移量
手动提交(同步,会自动重试):
consumer.commitSync()
异步提交:
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e!=null){
System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
}
}
});
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
public static void createTopic(String topicName, int partitions, short replicas) throws Exception { NewTopic newTopic = new NewTopic(topicName, partitions, replicas); CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic)); topics.all().get(); log.info("【{}】topic创建成功", topicName); } /** * @Title deleteTopic * @Description 删除topic * @param topicName topic名称 * @return void */ public static void deleteTopic(String topicName) throws Exception { DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName)); deleteTopicsResult.all().get(); log.info("【{}】topic删除成功", topicName); } /** * @Title updateTopicRetention * @Description 修改topic的过期时间 * @param topicName topic名称 * @param ms 过期时间(毫秒值) * @return void */ public static void updateTopicRetention(String topicName, String ms) throws Exception { ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms); Config config = new Config(Collections.singleton(configEntry)); // 创建AlterConfigsOptions AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000); // 执行修改操作 adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get(); log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms); }
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
kafkaTemplate.send("topic","test");
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class Listener {
@KafkaListener(topics = "topic")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。