赞
踩
-
- public class MyProducer implements Job {
- private static KafkaProducer<String,String> producer;
-
- static {
- Properties properties = new Properties();
- properties.put("bootstrap.servers","127.0.0.1:9092");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
- producer = new KafkaProducer<String, String>(properties);
- }
-
- /**
- * 第一种异步发送,直接发送,不管结果
- */
- private static void sendMessageForgetResult(){
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("kafka-study","name","Forget_result");
- producer.send(record);
- producer.close();
- }
-
-
- /**
- * 第二种异步发送,执行回调函数
- */
- private static void sendMessageCallback(){
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("kafka-study","name","callback");
- producer.send(record,new MyProducerCallback());
- }
-
- private static class MyProducerCallback implements Callback{
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e !=null){
- e.printStackTrace();
- return;
- }
- System.out.println(recordMetadata.topic());
- System.out.println(recordMetadata.partition());
- System.out.println(recordMetadata.offset());
- System.out.println("Coming in MyProducerCallback");
- }
- }
-
- 同步发送,在send()方法中使用Future对象获取发送消息返回的信息
- RecordMetadata recordMetadata =
- producer.send(new ProducerRecord<String, String>(this.topic, value)).get();
-
- /**
- * 第三种同步发送,等待执行结果
- */
- private static RecordMetadata sendMessageSync() throws Exception{
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("kafka-study","name","sync");
- RecordMetadata result = producer.send(record).get();
- System.out.println(result.topic());
- System.out.println(result.partition());
- System.out.println(result.offset());
- return result;
- }
-
- //定时任务
- @Override
- public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- try {
- sendMessageSync();
- }catch (Exception e){
- System.out.println("error:"+e);
- }
- }
-
-
- public static void main(String[] args){
- //sendMessageForgetResult();
- //sendMessageCallback();
- JobDetail job = JobBuilder.newJob(MyProducer.class).build();
-
- Trigger trigger=TriggerBuilder.newTrigger().withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
- try {
- Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
- scheduler.scheduleJob(job,trigger);
- scheduler.start();
- }catch (SchedulerException e){
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public class producer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- Properties props = new Properties();
- props.put("bootstrap.servers", "hadoop01:9092");// Kafka服务端的主机名和端口号
- props.put("acks", "all"); // 等待所有副本节点的应答
- props.put("retries", 3);// 消息发送最大尝试次数
- props.put("batch.size", 16384); // 一批消息处理大小
- props.put("linger.ms", 1); // 请求延时
- props.put("buffer.memory", 33554432); // 发送缓存区内存大小
- // key序列化
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // value序列化
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
- for (int i = 0; i < 50; i++) {
- //创建生产者无回调函数
- producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
- //创建生产者有回调函数
- producer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
-
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (metadata != null) {
- System.err.println(metadata.partition() + "---" + metadata.offset());
- }
- }
- });
- }
- producer.close();
- }
(每个分区都是一个完全有序的日志,但是分区之间没有全局排序(除了你的消息中可能包含的一些挂钟时间)。将消息分配给特定分区是可由编写者控制的,大多数用户选择通过某种密钥(例如用户id,或 订单ID)进行分区。分区允许在没有分片之间协调的情况下发生日志追加,并允许系统的吞吐量与Kafka群集大小线性地扩展。)
解决方法
严格说,kafka是无法保证全局消息有序的,没有这个机制,只能局部有序。
但是如果只有一个分片和一个消息的生产者,那么就相当于消息全局有序了。
如果有多个消息生产者,就算只有一个分片,如果这些生产者的消息都发给这个分片,那kafka中的消息连局部有序都没有办法了。
1.一个topic 一个分区 虽然保证全局有序,但是性能下降
2.单分区有序,那么我们想方法把同一个特征数据写到一个分区
- public class MyPartition implements Partitioner {
- private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
-
- @Override
- public void configure(Map<String, ?> configs) {
- }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // TODO Auto-generated method stub
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- int partitionNum = 0;
- try {
- partitionNum = Integer.parseInt((String) key);
- } catch (Exception e) {
- partitionNum = key.hashCode();
- }
- LOG.info("the message sendTo topic:" + topic + " and the partitionNum:" + partitionNum);
- return Math.abs(partitionNum % numPartitions);
- }
-
- @Override
- public void close() {
- }
-
- }
- /*
- INFO | the message sendTo topic:Topic-test and the partitionNum:42
- INFO | the message sendTo topic:Topic-test and the partitionNum:43
- INFO | the message sendTo topic:Topic-test and the partitionNum:44
- INFO | the message sendTo topic:Topic-test and the partitionNum:45 */
-
- public class Producer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.31.130:9092,192.168.31.131:9092,192.168.31.132:9092");// 该地址是集群的子集,用来探测集群。
- props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
- props.put("retries", 3);// 请求失败重试的次数
- props.put("batch.size", 16384);// batch的大小
- props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
- props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式,
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 设置属性 自定义分区类
- props.put("partitioner.class", "com.east.spark.kafka.MyPartition");
- KafkaProducer<String, String> producer = new KafkaProducer(props);
- for (int i = 0; i < 10000; i++) {
- // 三个参数分别为topic, key,value,send()是异步的,添加到缓冲区立即返回,更高效。
- producer.send(new ProducerRecord<String, String>("Topic-test", Integer.toString(i), Integer.toString(i)));
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- producer.close();
- }
- }
- String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094";
- Properties props = new Properties();
- //kafka连接信息
- props.put("bootstrap.servers",kafkas);
- //消费者组id
- props.put("group.id", "test_group");
- //是否自动提交offset
- props.put("enable.auto.commit", "true");
- //在没有offset的情况下采取的拉取策略
- props.put("auto.offset.reset", "none");
- //自动提交时间间隔
- props.put("auto.commit.interval.ms", "1000");
-
- //key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象
- //key反序列化
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //value反序列化
- props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
-
创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表,非常简单:
consumer.subscribe(Collections.singletonList("customerCountries"));
这个例子中只订阅了一个customerCountries主题。另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:
consumer.subscribe("test.*");
消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了一个非常简洁的poll方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。下面是一个代码样例:
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
- kafkaConsumer.subscribe(Arrays.asList("eshop"));
-
- try {
- while (true) { //1)
- ConsumerRecords<String, String> records = consumer.poll(100); // timeout
- for (ConsumerRecord<String, String> record : records) //3)
- {
- //poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
- log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
- record.topic(), record.partition(), record.offset(),record.key(), record.value());
- //相应处理逻辑
- int updatedCount = 1;
- if (custCountryMap.countainsValue(record.value())) {
- updatedCount = custCountryMap.get(record.value()) + 1;
- }
- custCountryMap.put(record.value(), updatedCount)
- JSONObject json = new JSONObject(custCountryMap);
- System.out.println(json.toString(4))
- }
- }
- //offset自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
- } finally {
- consumer.close(); //4主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。
- }
另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。
- public class Consumer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.31.130:9092,192.168.31.131:9092,192.168.31.132:9092");// 该地址是集群的子集,用来探测集群。
- props.put("group.id", "test");// cousumer的分组id
- props.put("enable.auto.commit", "true");// 自动提交offsets
- props.put("auto.commit.interval.ms", "1000");// 每隔1s,自动提交offsets
- props.put("session.timeout.ms", "30000");// Consumer向集群发送自己的心跳,超时则认为Consumer已经死了,kafka会把它的分区分配给其他进程
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
- consumer.subscribe(Arrays.asList("Topic-test"));// 订阅的topic,可以多个
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("partition = %d , offset = %d, key = %s, value = %s", record.partition(),
- record.offset(), record.key(), record.value());
- System.out.println();
- }
- }
- }
- }
- /*
- partition = 1 , offset = 48, key = 1, value = 1
- partition = 2 , offset = 41, key = 2, value = 2
- partition = 0 , offset = 51, key = 3, value = 3
- partition = 1 , offset = 49, key = 4, value = 4
- partition = 2 , offset = 42, key = 5, value = 5
- partition = 0 , offset = 52, key = 6, value = 6
- partition = 1 , offset = 50, key = 7, value = 7
- partition = 2 , offset = 43, key = 8, value = 8
- partition = 0 , offset = 53, key = 9, value = 9
- partition = 1 , offset = 51, key = 10, value = 10
- partition = 2 , offset = 44, key = 11, value = 11
- */
在动态分配partition的场景下,消费者的加入和删除,都会导致partition的重新分配给其他的消费者。而静态分配partition下,如果消费者挂掉后,分配给这个消费者的partition并不会负载给其他消费者。静态分配partition的模式,消费者不是订阅主题,而是订阅指定的partition。
通过subscribe()方法订阅主题具有消费者自动均衡的功能。在多线程情况下,多个消费者进程根据分区分配策略自动分配消费者线程与分区的关系,当一个消费者组的消费者发生增减变化,分区分配会自动调整,以实现消费负载均衡及故障自动转移。指定分区assign()方法订阅主题不具有自动均衡的功能。
- consumer.subscribe(Arrays.asList("topic"));//主题
- //上面这行命令换成下面这四句
- String topic = "topic";
- TopicPartition partition0 = new TopicPartition(topic, 0);//主题,分区0
- TopicPartition partition1 = new TopicPartition(topic, 1);//主题,分区1
- consumer.assign(Arrays.asList(partition0, partition1));
注意:手动分配分区(assgin)和 动态分区分配的订阅topic模式(subcribe)不能混合使用。
新版本把高阶消费者和低阶消费者整合到一起了,对应KafkaConsumer类的subscribe和assign方法。
-
- Properties prop = new Properties();
- prop.put("bootstrap.servers", "mini1:9092,mini2:9092,mini3:9092");//指定节点地址
- prop.put("group.id", "001");
- prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.put("consumer-id", "test");
- Consumer<String, String> consumer = new KafkaConsumer<>(prop);//消费者
-
- TopicPartition p = new TopicPartition("test6", 2);//只消费分区号为2的分区
- consumer.assign(Arrays.asList(p));
- // consumer.subscribe(Arrays.asList("test6"));//消费topic 消费全部分区
-
-
- // subscribe to some partitions of topic foo
- TopicPartition partition0 = new TopicPartition("foo", 0);
- TopicPartition partition1 = new TopicPartition("foo", 1);
- TopicPartition[] partitions = new TopicPartition[2];
- partitions[0] = partition0;
- partitions[1] = partition1;
- consumer.assign(Arrays.asList(partitions ));
-
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));//消费一定时间的数据
- Thread.sleep(3000);
- System.out.println("循环");
- for (ConsumerRecord<String, String> record : poll) {
- System.out.println(String.format("key:%s , value:%s , offset:%s", record.key(), record.value(), record.offset()));
- }
- }
- @KafkaListener(topicPartitions = {@TopicPartition(topic = "mayikt", partitions = {"0"})})
- public void receive(ConsumerRecord<?, ?> consumer) {
- System.out.println("topic名称:" + consumer.topic() + ",key:" +
- consumer.key() + "," +
- "分区位置:" + consumer.partition()
- + ", 下标" + consumer.offset());
- }
上面的例子中只设置了几个最基本的消费者参数,bootstrap.servers,group.id,key.deserializer和value.deserializer,其他的参数可以看Kafka文档。虽然我们很多情况下只是使用默认设置就行,但了解一些比较重要的参数还是很有帮助的。
fetch.min.bytes
这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。
fetch.max.wait.ms
上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。
max.partition.fetch.bytes
这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。
需要注意的是,max.partition.fetch.bytes必须要比broker能够接收的最大的消息(由max.message.size设置)大,否则会导致消费者消费不了消息。另外,在上面的样例可以看到,我们通常循环调用poll方法来读取消息,如果max.partition.fetch.bytes设置过大,那么消费者需要更长的时间来处理,可能会导致没有及时poll而会话过期。对于这种情况,要么减小max.partition.fetch.bytes,要么加长会话时间。
session.timeout.ms
这个参数设置消费者会话过期时间,默认为3秒。也就是说,如果消费者在这段时间内没有发送心跳,那么broker将会认为会话过期而进行分区重平衡。这个参数与heartbeat.interval.ms有关,heartbeat.interval.ms控制KafkaConsumer的poll()方法多长时间发送一次心跳,这个值需要比session.timeout.ms小,一般为1/3,也就是1秒。更小的session.timeout.ms可以让Kafka快速发现故障进行重平衡,但也加大了误判的概率(比如消费者可能只是处理消息慢了而不是宕机)。
auto.offset.reset
这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)。
enable.auto.commit
这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false。如果为true,你可能需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
partition.assignment.strategy
我们已经知道当消费组存在多个消费者时,主题的分区需要按照一定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:
partition.assignment.strategy设置了分配策略,默认为org.apache.kafka.clients.consumer.RangeAssignor(使用范围策略),你可以设置为org.apache.kafka.clients.consumer.RoundRobinAssignor(使用轮询策略),或者自己实现一个分配策略然后将partition.assignment.strategy指向该实现类。
client.id
这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。
max.poll.records
这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。
receive.buffer.bytes、send.buffer.bytes
这两个参数控制读写数据时的TCP缓冲区,设置为-1则使用系统的默认值。如果消费者与broker在不同的数据中心,可以一定程度加大缓冲区,因为数据中心间一般的延迟都比较大。
Kafka的多线程消费者实现 - NYC's Bloghttp://niyanchun.com/kafka-multi-thread-consumer.html
- package com.kafka.singleton;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
- import java.util.Random;
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public final class KafkaProducerSingleton {
- private static final Logger LOGGER = LoggerFactory .getLogger(KafkaProducerSingleton.class);
- private static KafkaProducer<String, String> kafkaProducer;
- private Random random = new Random();
- private String topic;
- private int retry;
- private KafkaProducerSingleton() {
- }
- /**
- * 静态内部类
- */
- private static class LazyHandler {
- private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
- }
- /**
- * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
- */
- public static final KafkaProducerSingleton getInstance() {
- return LazyHandler.instance;
- }
- /**
- * kafka生产者进行初始化
- */
- public void init(String topic,int retry) {
- this.topic = topic;
- this.retry = retry;
- if (null == kafkaProducer) {
- Properties props = new Properties();
- InputStream inStream = null;
- try {
- inStream = this.getClass().getClassLoader()
- .getResourceAsStream("kafka.properties");
- props.load(inStream);
- kafkaProducer = new KafkaProducer<String, String>(props);
- } catch (IOException e) {
- LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
- } finally {
- if (null != inStream) {
- try {
- inStream.close();
- } catch (IOException e) {
- LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
- }
- }
- }
- }
- }
- /**
- * 通过kafkaProducer发送消息
- * @param topic 消息接收主题
- * @param partitionNum 哪一个分区
- * @param retry 重试次数
- * @param message 具体消息值
- */
- public void sendKafkaMessage(final String message) {
- /**
- * 1、如果指定了某个分区,会只讲消息发到这个分区上
- * 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用
- * 3、如果没有指定分区和key,那么将会随机发送到topic的分区中
- * 4、如果指定了key,那么将会以hash<key>的方式发送到分区中
- */
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(
- topic, random.nextInt(3), "", message);
- // send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率
- // kafka生产者是线程安全的,可以单实例发送消息
- kafkaProducer.send(record, new Callback() {
- public void onCompletion(RecordMetadata recordMetadata,
- Exception exception) {
- if (null != exception) {
- LOGGER.error("kafka发送消息失败:" + exception.getMessage(),
- exception);
- retryKakfaMessage(message);
- }
- }
- });
- }
- /**
- * 当kafka消息发送失败后,重试
- */
- private void retryKakfaMessage(final String retryMessage) {
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(
- topic, random.nextInt(3), "", retryMessage);
- for (int i = 1; i <= retry; i++) {
- try {
- kafkaProducer.send(record);
- return;
- } catch (Exception e) {
- LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
- retryKakfaMessage(retryMessage);
- }
- }
- }
- /**
- * kafka实例销毁
- */
- public void close() {
- if (null != kafkaProducer) {
- kafkaProducer.close();
- }
- }
- public String getTopic() {
- return topic;
- }
- public void setTopic(String topic) {
- this.topic = topic;
- }
- public int getRetry() {
- return retry;
- }
- public void setRetry(int retry) {
- this.retry = retry;
- }
- }
- package com.travelsky.kafka.singleton;
- public class HandlerProducer implements Runnable {
- private String message;
- public HandlerProducer(String message) {
- this.message = message;
- }
- @Override
- public void run() {
- KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton
- .getInstance();
- kafkaProducerSingleton.init("test_find",3);
- System.out.println("当前线程:" + Thread.currentThread().getName()
- + ",获取的kafka实例:" + kafkaProducerSingleton);
- kafkaProducerSingleton.sendKafkaMessage("发送消息" + message);
- }
- }
kafka.properties
- bootstrap.servers=master:9092,slave1:9092,slave2:9092
- acks=1
- retries=0
- batch.size=1000
- compression.type=gzip
- #buffer.memory=33554432
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
- public static void main(String[] args) {
- Kafka_Consumer kafka_Consumer = new Kafka_Consumer();
- try {
- kafka_Consumer.execute();
- Thread.sleep(20000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- kafka_Consumer.shutdown();
- }
- }
- package com.kafka.consumer;
- import java.util.Arrays;
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- public final class Kafka_Consumer {
- /**
- * kafka消费者不是线程安全的
- */
- private final KafkaConsumer<String, String> consumer;
- private ExecutorService executorService;
- public Kafka_Consumer() {
- Properties props = new Properties();
- props.put("bootstrap.servers", "ip,port");
- props.put("group.id", "group");
- // 关闭自动提交
- props.put("enable.auto.commit", "false");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- consumer = new KafkaConsumer<String, String>(props);
- consumer.subscribe(Arrays.asList("test_find"));
- }
- public void execute() {
- executorService = Executors.newFixedThreadPool(3);
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(10);
- if (null != records) {
- executorService.submit(new ConsumerThread(records, consumer));
- }
- }
- }
- public void shutdown() {
- try {
- if (consumer != null) {
- consumer.close();
- }
- if (executorService != null) {
- executorService.shutdown();
- }
- if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
- System.out.println("Timeout");
- }
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
- package com.kafka.consumer;
- import java.util.Collections;
- import java.util.List;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.consumer.OffsetAndMetadata;
- import org.apache.kafka.common.TopicPartition;
- /**
- * 多消费者,多个work线程,难保证分区消息消费的顺序性
- */
- public final class ConsumerThread implements Runnable {
- private ConsumerRecords<String, String> records;
- private KafkaConsumer<String, String> consumer;
- public ConsumerThread(ConsumerRecords<String, String> records,
- KafkaConsumer<String, String> consumer) {
- this.records = records;
- this.consumer = consumer;
- }
- @Override
- public void run() {
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records
- .records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- System.out.println("当前线程:" + Thread.currentThread() + ","
- + "偏移量:" + record.offset() + "," + "主题:"
- + record.topic() + "," + "分区:" + record.partition()
- + "," + "获取的消息:" + record.value());
- }
- // 消费者自己手动提交消费的offest,确保消息正确处理后再提交
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1)
- .offset();
- consumer.commitSync(Collections.singletonMap(partition,
- new OffsetAndMetadata(lastOffset + 1)));
- }
- }
- }
- import com.travelsky.kafka.singleton.HandlerProducer;
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration(locations = { "classpath:applicationContext.xml" })
- public class Kafka生产_多线程单实例 {
- @Test
- public void testSendMessageSingleton() throws InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(3);
- for (int i = 1; i <= 10; i++) {
- Thread.sleep(1000);
- executor.submit(new HandlerProducer(":" + i));
- }
- }
- }
先起消费者,再起生产者,运行效果如下
消费者:
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:44,主题:test_find,分区:1,获取的消息:发送消息:1
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:45,主题:test_find,分区:1,获取的消息:发送消息:2
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:46,主题:test_find,分区:1,获取的消息:发送消息:3
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:39,主题:test_find,分区:0,获取的消息:发送消息:4
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:47,主题:test_find,分区:1,获取的消息:发送消息:5
- 当前线程:Thread[pool-1-thread-3,5,main],偏移量:40,主题:test_find,分区:0,获取的消息:发送消息:6
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:37,主题:test_find,分区:2,获取的消息:发送消息:7
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:38,主题:test_find,分区:2,获取的消息:发送消息:8
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:48,主题:test_find,分区:1,获取的消息:发送消息:9
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:39,主题:test_find,分区:2,获取的消息:发送消息:10
生产者:
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
kafka的消费者(KafkaConsumer对象)并不是线程安全的。客户端代码需要自己确保多线程的访问是同步的。 唯一例外的是wakeup方法(是线程安全的):它可以被外部线程用来安全地中断一个进行中的操作。对于阻塞在wakeup方法上的线程会抛出WakeupException。可以被另外的线程用来作为关闭consumer的钩子。
- public class KafkaConsumerRunner implements Runnable {
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final KafkaConsumer consumer;
-
- public void run() {
- try {
- consumer.subscribe("topic");
- while (!closed.get()) {
- ConsumerRecords records = consumer.poll(10000);
- // 处理新的记录
- }
- } catch (WakeupException e) {
- if (!closed.get()) throw e; //如果关闭了忽略异常
- } finally {
- consumer.close();
- }
- }
- // 关闭钩子,可以在另一个线程中调用
- public void shutdown() {
- closed.set(true);
- consumer.wakeup();
- }
- }
SparkStreaming +kafka 的offset保存MySQL、hbase、redis、zookeeper_.-CSDN博客
(1)新旧版本偏移量的变化
在Kafka0.9版本之前消费者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消费者不在保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中“consumer_offsets”,该主题默认有50个分区,每个分区3个副本,分区数量有参数offset.topic.num.partition设置。通过消费者组ID的哈希值和该参数取模的方式来确定某个消费者组已消费的偏移量保存到consumer_offsets主题的哪个分区中。
(2)查询偏移量
Kafka消费者API提供两种方法用来查询偏移量。
一个是committed(TopicPartition partition)方法,这个方法返回一个OffsetAndMetadata对象,通过这个对象可以获取指定分区已提交的偏移量;
- TopicPartition topicPartition = new TopicPartition(topic, 0);
- OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
- long offset = offsetAndMetadata.offset();
另外一个方法position(TopicPartition partition)返回的是下一次拉取位置。
long position = consumer.position(topicPartition);
(3)重置消费偏移量
Kafka提供重置消费者偏移量的方法:
1.seek(TopicPartition partition, long offset):将消费者起始位置重置到指定偏移量位置
//指定offset消费.
consumer.seek(topicPartition,103);
2.seekToBeginning():指定从消息起始位置开始消费,对应(auto.offset.reset=earlist)
//指定从最先消费
consumer.seekToBeginning(Collections.singleton(topicPartition));
3.seekToEnd():指定从最新消息对应的位置开始消费,有新消息才消费。对应(auto.offset.reset=latest)
//指定从最新消费(舍弃)
consumer.seekToEnd(Collections.singleton(topicPartition))
偏移量提交有自动和手动,默认是自动(enable.auto.commit = true)。自动提交的话每隔多久自动提交一次呢?这个由消费者协调器参数auto.commit.interval.ms 毫秒执行一次提交。有些场景我们需要手动提交偏移量,尤其是在一个长事务中并且保证消息不被重复消费以及消息不丢失,比如生产者一个订单提交消息,消费者拿到后要扣减库存,扣减成功后该消息才能提交,所以在这种场景下需要手动提交,因为库存扣减失败这个消息就不能消费,同时客户这个订单状态也不能是成功。手动提交也有两种一个是同步提交一个是异步提交,其区别就是消费者线程是否阻塞。如果使用手动提交就要关闭自动提交,因为自动提交默认是开启的。
三、Kafka消费者消费位移两种策略:
这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡reblance后这部分消息重复消费。
- // 配置信息
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
-
- // 创建消费者实例, 并且订阅topic
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
-
- // 消费者消费消息
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
- }
Kafka提供两种手动提交方式:
1.异步提交(commitAsync):
异步模式下,提交失败也不会尝试提交。消费者线程不会被阻塞,因为异步操作,可能在提交偏移量操作结果未返回时就开始下一次拉取操作。
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records){
- System.out.printf("topic = %s, partition = %s,
- offset = %d, customer = %s, country = %s\n",
- record.topic(), record.partition(), record.offset(),
- record.key(), record.value());
- }
- consumer.commitAsync();
- }
2.同步提交(CommitSync):
同步模式下,提交失败时一直尝试提交,直到遇到无法重试才结束。同步方式下,消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。
实现手动提交前需要在创建消费者时关闭自动提交,设置enable.auto.commit=false。
- props.put("enable.auto.commit", "false"); // 设置autoCommit为false
-
- int commitInterval = 200;
- List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- buffer.add(record);
- if (buffer.size() >= commitInterval) {
- insertIntoDb(buffer);
- consumer.commitSync();
- buffer.clear();
- }
- }
- }
-
-
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records){
- System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
- }
- try {
- consumer.commitSync();
- } catch (CommitFailedException e) {
- log.error("commit failed", e)
- }
- }
由于异步提交不会等消费偏移量提交成功后再拉取下一次消息,因此异步提交提供了一个偏移量提交回调方法commitAsync(OffsetCommitCallback callback)。提交偏移量完成之后会回调OffsetCommitCallback接口的onComplete()方法
一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:
- try{
- //最少处理100条
- int minCommitSize = 100;
- //定义计数器
- int icount = 0;
- // 4、获取数据
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
- icount++;
- }
- Thread.sleep(5000);
-
- //在业务逻辑处理成功后提交offset
- if(icount >= minCommitSize){
- //满足最少消费100条,再进行异步提交
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if(exception == null){
- System.out.println("commit success");
- }else {
- System.out.println("commit failed"); //提交失败,对应处理
- }
- }
- });
-
- //计数器归零
- icount = 0 ;
- }
- }
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("topic = %s, partition = %s, offset = %d,
- customer = %s, country = %s\n",
- record.topic(), record.partition(),
- record.offset(), record.key(), record.value());
- }
- consumer.commitAsync();
- }
- } catch (Exception e) {
- log.error("Unexpected error", e);
- } finally {
- try {
- consumer.commitSync();
- } finally {
- consumer.close();
- }
- }
SparkStreaming +kafka 的offset保存MySQL、hbase、redis、zookeeper_.-CSDN博客
今天遇到一个kafka的问题,在生产者发送消息之后,消费者会消费多次。在网上查询了很久,最终是在这个博客的引导下发现了问题 ,里面提到了kafka中的配置enable.auto.commit 是 true,这个会自动提交,然后是当我们的配置是自动提交的时候,消费者的消息投递保证有可能是at least once,或者at most once。当到达提交时间间隔,触发Kafka自动提交上次的偏移量时,就可能发生at most once的情况, 在这段时间,如果消费者还没完成消息的处理进程就崩溃了, 消费者进程重新启动时,它开始接收上次提交的偏移量之后的消息,实际上消费者可能会丢失几条消息;而当消费者处理完消息并将消息提交到持久化存储系统,而消费者进程崩溃时,会发生at least once的情况。 在此期间,kafka没有向broker提交offset,因为自动提交时间间隔没有过去。 当消费者进程重新启动时,会收到从上次提交的偏移量开始的一些旧消息。正是这个导致消息丢失或者重复消费现象。
在一些情况下,即使消费者进程没有崩溃,假如中间有一个消息的业务逻辑执行抛出了异常,消费者也当作是接收到了消息,程序执行回滚,这条消息也等同于丢失了。我关闭了自动提交(enable.auto.commit:false),当消费者每次 poll 处理完业务逻辑后必须完成手动同步提交(commitSync),如果消费者在消费过程中发生 crash,或者执行业务逻辑发生异常回滚,下次启动时依然会从之前的位置开始消费,从而保证每次提交的内容都能被消费,即实现了at least once保证。
通过代码说明一下:
- @StreamListener(MySink.INPUTB)
- public void messageListen(JSONObject message) {
- //下面是具体的消息消费事件
- int result = doSoming();
- if (result<0){
- System.out.println("service消息内容:解析失败");
- }else {
- System.out.println("service消息内容:解析成功");
- }
- }
如果是在doSoming()的过程中程序抛出异常,而又没有实现异常捕获的时候,消费者就又以为消息没有消费,会重新去再走一遍这个消费方法,即又会重新执行doSoming();这样就可能造成消息的重复消费。
最后的解决方式有两种:
1、保证自己在doSoming()中对所有异常都能捕捉,并做相应的处理。
2、在这个监听的方法开始设置一个唯一键字段,幂等性,消费时根据唯一键查询这条消息,判断是否消费过。也可以通过redis缓存来实现类似的机制。
- @KafkaListener(topics = {"${kafka.topic.topicB}"}, groupId = "groupB")
- public void consumeTopicB(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
- Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
-
- if(kafkaMessage.isPresent()) {
- Object message = kafkaMessage.get();
- /*
- * 执行消费逻辑处理的代码,
- */
- acknowledgment.acknowledge();// 消费成功后手动提交offset
-
- logger.info("消费者B消费topicB:{} partition:{}的消息 -> {}", consumerRecord.topic(), consumerRecord.partition(),message);
- }
比如在上面的消费逻辑处理过程中,失败了。那么此条消费要怎么处理呢?我是设置手动提交offset的。
我的思路是这样的:
如果失败了以后,把失败的数据存入到数据库中,然后在提交offset。然后后续在定时的从数据库中把失败的数据再次发送到对应的topic下,等待下次的消费。
但是这样的话有个问题,比如某条消息一直失败,不可能无限重复上面的操作吧?
所以我想的是在消息模型中添加一个失败重试次数属性:
- public class KafkaMsg implements Serializable {
-
- private static final long serialVersionUID = -1532915942422600087L;
-
- private String msgId;
- private String content;
- private Integer retryTime; // 重试次数记录
- public String getMsgId() {
- return msgId;
- }
-
- public String getContent() {
- return content;
- }
-
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
-
- public void setContent(String content) {
- this.content = content;
- }
-
- public Integer getRetryTime() {
- return retryTime;
- }
-
- public Integer setRetryTime(Integer time) {
- this.retryTime = time;
- }
-
- @Override
- public String toString() {
- return "KafkaMsg{" +
- "msgId='" + msgId + '\'' +
- ", content='" + content + '\'' +
- '}';
- }
- }
然后消费失败后,先记录一下重试次数再把它存入数据库,然后定时再次发送到topic时,先判断它的重试次数是否达到上限,没有就再次写入topic等待再次被消费
默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。
自定义的 SeekToCurrentErrorHandler
在实际开发中,我们可以通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理。例如设置重试次数,在达到最大重试次数仍然失败后保存消息到数据库后续做人工处理。
SeekToCurrentErrorHandler可以针对消息的单条消费失败的消费重试处理。在消息消费失败时,SeekToCurrentErrorHandler 会将调用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。同时,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。另外,在 FailedRecordTracker 中,会调用 BackOff 来进行计算,该消息的下一次重新消费的时间,通过 Thread#sleep(...) 方法,实现重新消费的时间间隔。
但是,FailedRecordTracker 提供的计数是客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下 FailedRecordTracker 类,通过 ZooKeeper 存储计数。
- public class SeekToCurrentErrorHandler extends FailedRecordProcessor implements ContainerAwareErrorHandler {
- private boolean ackAfterHandle;
-
- //默认是失败后重试9次,没有BiConsumer接口的实现也就是没有后续处理逻辑
- public SeekToCurrentErrorHandler() {
- this((BiConsumer)null, SeekUtils.DEFAULT_BACK_OFF);//DEFAULT_BACK_OFF = new FixedBackOff(0L, 9L)
- }
-
- public SeekToCurrentErrorHandler(BackOff backOff) {
- this((BiConsumer)null, backOff);
- }
-
- public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
- this(recoverer, SeekUtils.DEFAULT_BACK_OFF);//DEFAULT_BACK_OFF = new FixedBackOff(0L, 9L)
- }
-
- public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
- super(recoverer, backOff);
- this.ackAfterHandle = true;
- }
- public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
- SeekUtils.seekOrRecover(thrownException, records, consumer, container, this.isCommitRecovered(), this.getSkipPredicate(records, thrownException), this.logger, this.getLogLevel());
- }
- }
自定义SeekToCurrentErrorHandler处理消费异常
通过上面的源码可以看到默认无参构造器指定了BackOff为SeekUtils.DEFAULT_BACK_OFF,即默认是失败后重试9次。但是默认没有BiConsumer接口的实现也就是没有后续处理逻辑。我们可以在项目中自定义SeekToCurrentErrorHandler这个Bean,然后自定义BiConsumer接口的实现来实现重试最大次数扔失败后的后续续处理逻辑和失败时重试次数。
- @Configuration
- public class KafkaConfiguration {
- @Bean
- @Primary
- public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
- SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(new BiConsumer<ConsumerRecord<?, ?>, Exception>() {
- @Override
- public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
- //重试4次仍然失败后会进去BiConsumer接口的accept方法,可以进行保存数据库等操作
- log.info("消费失败4次,消息保存到数据库. record:{}",JSON.toJSONString(record));
- }
- }, new FixedBackOff(0L, 4L));//interval 为0标识立即重试,maxAttempts为4标识最多重试四次
- return errorHandler;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。