赞
踩
1. RabbitMQ:
优势:
1.1 吞吐量峰值十万
1.2 稳定性良好
1.3 支持的语言众多
1.4 消息队列类型有7种, 模式多
1.5 使用的AMQP协议
劣势:
1.1 RabbitMQ的开发语言小众, 是由Elang语言开发的, 源码阅读困难, 不便于扩展
1.2 不支持事务
2. RocketMQ
优势:
2.1 吞吐量峰值二十万
2.2 稳定性良好
2.3 支持事务
2.4 使用的AMQP协议
劣势:
2.1 只支持Java 与 C++语言
3. Kafka:
优势:
3.1 吞吐量峰值一百万
3.2 消息可指定追溯
劣势:
3.1 耦合度高, 需要注册到Zookeeper才可以使用
3.2 只支持Java、C++语言
3.3 不支持消息优先级
3.4 不支持标准协议
3.5 严格的顺序机制
3.6 会丢失数据(异步发送模式 fire-and-foget)
4. ActiveMQ:
优势:
4.1 吞吐量峰值十万
4.2 稳定性良好
劣势:
4.1 由Apache开发的, 目前对它不怎么维护
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
安装
下载zookeeper镜像
docker pull zookeeper
启动Zookeeper容器
docker run -d --name zookeeper -p 2181:2181 -t zookeeper
下载Kafka镜像
docker pull wurstmeister/kafka
启动Kafka容器
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.66.133 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
Pom.xml
<!-- 对应Spring boot 2.3.8- -->
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
spring:
kafka:
bootstrap-servers: 39.108.135.173:9092 # Kafka 服务器地址
producer: # 生产者配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者的Key消息序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者的Value消息序列化器
retries: 3 # 生产者消息发送失败的重试次数
consumer: # 消费者配置
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消费者的Key消息反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消费者的Value消息反序列化器
group-id: groupA # 定义消费者组别名称
- public Properties(): 设置Kafka参数
- public synchronized V put(K key, V value): 设置Kafka服务器地址、key序列化器、Value序列化器、生产者发送失败重试次数、消费组名称
- public KafkaProducer(Properties properties): 连接Kafka,创建生产者对象
- public ProducerRecord(String topic, K key, V value): 创建消息, 参数一:topic名称, 参数二:key, 参数三:value
- public Future send(ProducerRecord<K, V> record): 生产者异步发送消息
- send(msg).get(): 生产者同步发送消息
- public void close(): 释放资源
- public void subscribe(Collection topics): 监听topic
- public ConsumerRecords<K, V> poll(Duration timeout): 接收消息
- public K key(): 获取Key
- public V value(): 获取Value
- public long offset(): 当前消息偏移量
- public class ProducerConfig extends AbstractConfig: 获取生产者所需要的一系列配置参数
1.1 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: 设置Kafka消息提供者的服务器地址
1.2 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: Key序列化器
1.3 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: Value序列化器
1.4 ProducerConfig.RETRIES_CONFIG: 生产者发送消息失败重发次数
1.5 ProducerConfig.BUFFER_MEMORY_CONFIG: RecordAccumulator缓冲区大小
1.6 ProducerConfig.LINGER_MS_CONFIG: 等待时间
1.7 ProducerConfig.ACKS_CONFIG: 判断请求是否完整的条件, 指定all将会阻塞消息
1.8 public interface Partitioner extends Configurable, Closeable: 自定义分区- public class ConsumerConfig extends AbstractConfig: 获取消费者所需要的一系列配置参数, 与常用参数与生产者一致
- public ProducerRecord(String topic, K key, V value): 每条数据都要封装成一个ProducerRecord对象
// 示例 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10 ; i++) { producer.send(new ProducerRecord<>("my-topic","ImKey-"+i,"ImValue-"+i)); } // 关闭资源 producer.close(); }
配置参数说明:
- send():方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。
- ack:是判断请求是否完整的条件(就会判断是不是成功发送了,也就是上次说的ACK机制),指定all将会阻塞消息,性能低但是最可靠。
- retries:如果请求失败,生产者会自动重试,我们指定是1次,但是启动重试就有可能出现重复数据。
- batch.size:指定缓存的大小,生产者缓存每个分区未发送的消息。值越大的话将会产生更大的批量,并需要更大的内存(因为每个活跃的分区都有一个缓存区)。
- linger.ms:指示生产者发送请求之前等待一段时间,设置等待时间是希望更多地消息填补到未满的批中。默认缓冲可以立即发送,即便缓冲空间还没有满,但是如果想减少请求的数量可以设置linger.ms大于0。需要注意的是在高负载下,相近的时间一般也会组成批,即使等于0。
- buffer.memory:控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后将会抛出一个TimeoutException
- key.serializer和value.serializer将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerializaer或StringSerializer处理简单的string或byte类型。
- bootstrap.servers: 集群是通过配置bootstrap.servers指定一个或多个broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)
- enable.auto.commit: 自动提交偏移量,如果设置了自动提交偏移量,下面这个设置就必须要用到了。
- auto.commit.interval.ms:自动提交时间间隔,和自动提交偏移量配合使用
- max.poll.records:控制从 broker拉取的消息条数
- poll(long time): 当消费者获取不到消息时,就会使用这个参数,为了减轻无效的循环请求消息,消费者会每隔long time的时间请求一次消息,单位是毫秒。
- session.timeout.ms: broker通过心跳机器自动检测消费者组中失败的进程,消费者会自动ping集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。
- auto.offset.reset: 值有三种:earliest,latest,none
7.1 earliest:
当各分区下有已经提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费,最常用的值
7.2 latest:
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
7.3 none:
topic各分区都存在已提交的offset时,从offset后开始消费,只要有一个分区不存在已提交的offset,则抛出异常
!!注意:当使用了latest,并且分区没有已提交的offset时,消费新产生的该分区下的数据,其实是把offset的值直接设置到最后一个消息的位置。例如,有个30条数据的demo的topic,各分区无提交offset,使用了latest,再看offset就会发现已经在30的位置了,所以才只能消费新产生的数据!!!!- auto.commit.interval.ms: 自动提交时间。若设置自动提交时间为一分钟(单位为毫秒值),也就是你在这一分钟内拉取任何数量的消息都不会被提交消费的当前偏移量,如果你此时关闭消费者(一分钟内),下次消费还是从和第一次的消费数据一样,即使你在一分钟内消费完所有的消息,只要你在一分钟内关闭程序,导致提交不了offset,就可以一直重复消费数据。
8.1: 在消费过程中设置sleep
如果你消费了第一批数据,在执行第二次poll的时候,关闭程序也不会提交偏移量,只有在执行第二次poll的时候才会把上一次的最后一个offset提交上去。
// 消息生产者 public static void main(String[] args) { //设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092"); //kafka服务器地址 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// key序列化器 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //value序列化器 props.put(ProducerConfig.RETRIES_CONFIG,3);//生产者发送失败重试次数 //连接Kafka,创建生产者对象 KafkaProducer kafkaProducer = new KafkaProducer(props); // 创建消息, 参数一:topic名称, 参数二:key, 参数三:value ProducerRecord<String,String> msg = new ProducerRecord<>("hello","2","hello kafka!!!"); //发送消息 kafkaProducer.send(msg);//异步 try { kafkaProducer.send(msg).get();//同步 } catch (Exception e) { e.printStackTrace(); } //释放资源 kafkaProducer.close(); }
// 消息消费者: 手动提交偏移量 public static void main(String[] args) { // 参数配置 Properties props = new Properties(); props.put("bootstrap.servers","XXXXXC:9093"); props.put("group.id","test-11");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit","false");//自动提交offset props.put("auto.commit.interval.ms","1000"); // 自动提交时间间隔 props.put("max.poll.records","20"); // 拉取的数据条数 props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); int i= 0; while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); i++; } if (i == 20){ System.out.println("i_num:"+i); // 同步提交 consumer.commitSync(); // 异步提交 // consumer.commitAsync(); }else { System.out.println("不足二十个,不提交"+i); } i=0; } }
提示:
不需要定时提交偏移量,可以自己控制offset,当消息已经被我们消费过后,再去手动提交他们的偏移量。这个很适合我们的一些处理逻辑。
手动提交offset的方法有两种:分别是commitSync(同步提交) 和commitAsync(异步提交)。两者的相同点,都会将本次poll的一批数据最高的偏移量提交;不同点是commitSync会失败重试,一直到提交成功(如果有不可恢复的原因导致,也会提交失败),才去拉取新数据。而commitAsync则没有重试机制(提交了就去拉取新数据,不管这次的提交有没有成功),故有可能提交失败。
// 自定义偏移量: 更细致的控制偏移量提交 public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXXXXXX:9093"); props.put("group.id","test-18");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit","false");//自动提交offset props.put("auto.commit.interval.ms","1000000"); // 自动提交时间间隔 props.put("max.poll.records","5"); // 拉取的数据条数 props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { for (Map.Entry<TopicPartition,OffsetAndMetadata> entry : map.entrySet()){ System.out.println("提交的分区:"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset()); } } }); } } }
// 订阅指定的分区: 通过消费者Kafka会通过分区分配分给消费者一个分区,但是我们也可以指定分区消费消息,要使用指定分区,只需要调用assign(Collection)消费指定的分区即可: public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","XXXXXXXXX:9093"); props.put("group.id","test-19");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit","false");//自动提交offset props.put("auto.commit.interval.ms","1000000"); // 自动提交时间间隔 props.put("max.poll.records","5"); // 拉取的数据条数 props.put("session.timeout.ms","10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); // 你可以指定多个不同topic的分区或者相同topic的分区 我这里只指定一个分区 TopicPartition topicPartition = new TopicPartition("my-topic", 0); // 调用指定分区用assign,消费topic使用subscribe consumer.assign(Arrays.asList(topicPartition)); while (true){ ConsumerRecords<String, String> records = consumer.poll(5000); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { for (Map.Entry<TopicPartition,OffsetAndMetadata> entry : map.entrySet()){ System.out.println("提交的分区:"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset()); } } }); } } }
提示:
一旦手动分配分区,你可以在循环中调用poll。消费者分区任然需要提交offset,只是现在分区的设置只能通过调用assign 修改,因为手动分配不会进行分组协调,因此消费者故障或者消费者的数量变动都不会引起分区重新平衡。每一个消费者是独立工作的(即使和其他的消费者共享GroupId)。为了避免offset提交冲突,通常你需要确认每一个consumer实例的groupId都是唯一的。
注意:
手动分配分区(assgin)和动态分区分配的订阅topic模式(subcribe)不能混合使用。
// 消息消费者: 消费者的Kafka消息格式: key-value对象 public static void main(String[] args) { //设置参数 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092"); //kafka服务器地址 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// key反序列化器 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //value反序列化器 props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupA");//消费组名称 //创建消费者对象 KafkaConsumer kafkaConsumer = new KafkaConsumer(props); //监听topic kafkaConsumer.subscribe(Collections.singleton("hello")); //接收消息 while(true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); String value = record.value(); System.out.println(key + "---" + value); System.out.println("当前消息偏移量:"+record.offset()); } } }
API生产者自定义分区策略:
- 指定分区就发送到指定分区
- 没有指定分区,有key值,就按照key值的Hash值分配分区
- 没有指定分区,也没有指定key值,轮询分区分配(只分配一次,以后都按照第一次的分区顺序)
自定义分区Demo
// Demo1: public class KafkaMyPartitions implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { // 这里写自己的分区策略 // 我这里指定为1 return 1; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
// KafkaProducerCallbackDemo: public class KafkaProducerCallbackDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXX:9093");//kafka集群,broker-list props.put("acks", "all"); props.put("retries", 1);//重试次数 props.put("batch.size", 16384);//批次大小 props.put("linger.ms", 1);//等待时间 props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定自定义分区 props.put("partitioner.class","com.firehome.newkafka.KafkaMyPartitions"); // 创建KafkaProducer客户端 KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 20; i < 25 ; i++) { producer.send(new ProducerRecord<String, String>("th-topic", "ImKey-" + i, "ImValue-" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ System.out.printf("消息发送成功!topic=%s,partition=%s,offset=%d \n",recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset()); }else { System.err.println("消息发送失败!"); } } }); } producer.close(); } }
负责协调主题下分区管理、分区副本管理
Kafka 中的数据单元被称为消息message,也被称为记录,可以把它看作数据库表中某一行的记录。
Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)。同一个主题下多个分区的消息一定是不同的
为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性。topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个文件进行存储。一个partition中的数据是有序的,多个partition之间的数据是没有顺序的。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica);所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表(保持同步的副本列表)中删除,重新创建一个Follower。
kafka对与zookeeper是强依赖的,是以zookeeper作为基础的,即使不做集群,也需要zk的支持。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行重平衡。
生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。
偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
消费者同组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
计算消息Key的Hash值 与 当前主题下的所有分区总数求余, 决定消息的存储位置, 若没有指定消息的Key 或 Key为null, 则存储位置随机
- 无序:
- 当多个Message发送到同一主题下的不同分区时, 消费Message时的顺序不一定是有序的
- 有序:
- 当多个Message发送到同一主题下的同一分区时, 消费Message时的顺序一定是有序的
- 只建立一个分区, 这样所有的Message都会发送到同一个分区
- 将Message发送到同一个Key
架构图
Producer从指定的主题中获取所有分区中的leader副本, 通过计算Key的哈希值与leader分区数量求余, 将Message发送到计算出来的leader副本, leader副本将收到的消息写入磁盘中, 所有follwer副本从leader副本pull Message, 并写入磁盘中, 并返回ACK给leader副本, leader副本返回ACK给Producer。
把消息发送给服务器,并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,使用这种方式有时候会丢失一些信息
- 优势: 发送消息的效率相对同步发送更高, 它不需要等待整个流程走完, 不会发生消息阻塞
- 劣势: 数据传输的可靠性相对同步发送较低。由于它不需要保证数据是否发送成功, 就会发生数据丢失的现象。
- Kafka的重试机制可以相对降低数据丢失的概率
- 使用回调函数, 也可以降低数据丢失的概率
//发送消息
try {
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功。
- 优势: 保证数据的有序性 及 数据的完整性(数据传输性高, 不会丢失数据)
- 劣势: 会发生阻塞, 必须等待所有流程完成后, 才会发送下一条Message, 效率低
//发送消息
try {
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(recordMetadata.offset());//获取偏移量
}catch (Exception e){
e.printStackTrace();
}
提示: 如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量
如果业务要求消息尽可能不丢失且必须是按顺序发送的,那么可以使用同步的方式 ( 只能在一个partation上 )
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。
//发送消息
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
e.printStackTrace();
}
System.out.println(recordMetadata.offset());
}
});
}catch (Exception e){
e.printStackTrace();
}
提示: 如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。
如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息
一个主题多个分区 + 异步发送+不回调
一个主题一个分区+ 同步发送 + 回调函数
架构图
Consumer从Kafka Cluster中获取所有分区的leader副本, 并发送pull message request请求获取leader副本中的Message, leader根据offset偏移量查询Message, 并响应Consumer的pull message response 请求的Message
offset作用:
- 记录消费者消费消息的位置
- 可以追溯已经消费的消息
消费者同组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
- enable.auto.commit
该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。如果把它设置为true,还可以通过配置auto.commit.interval.ms
属性来控制提交的频率。- auto.offset.reset
- earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费- latest(默认值)
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据- none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常- anything else
向consumer抛出异常
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。