- public class MyProducer implements Job {
- private static KafkaProducer<String,String> producer;
- static {
- Properties properties = new Properties();
- properties.put("bootstrap.servers","");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
- producer = new KafkaProducer<String, String>(properties);
- }
- /**
- * 第一种异步发送,直接发送,不管结果
- */
- private static void sendMessageForgetResult(){
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("kafka-study","name","Forget_result");
- producer.send(record);
- producer.close();
- }
- /**
- * 第二种异步发送,执行回调函数
- */
- private static void sendMessageCallback(){
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("kafka-study","name","callback");
- producer.send(record,new MyProducerCallback());
- }
- private static class MyProducerCallback implements Callback{
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e !=null){
- e.printStackTrace();
- return;
- }
- System.out.println(recordMetadata.topic());
- System.out.println(recordMetadata.partition());
- System.out.println(recordMetadata.offset());
- System.out.println("Coming in MyProducerCallback");
- }
- }
- 同步发送,在send()方法中使用Future对象获取发送消息返回的信息
- RecordMetadata recordMetadata =
- producer.send(new ProducerRecord<String, String>(this.topic, value)).get();
- /**
- * 第三种同步发送,等待执行结果
- */
- private static RecordMetadata sendMessageSync() throws Exception{
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("kafka-study","name","sync");
- RecordMetadata result = producer.send(record).get();
- System.out.println(result.topic());
- System.out.println(result.partition());
- System.out.println(result.offset());
- return result;
- }
- //定时任务
- @Override
- public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- try {
- sendMessageSync();
- }catch (Exception e){
- System.out.println("error:"+e);
- }
- }
- public static void main(String[] args){
- //sendMessageForgetResult();
- //sendMessageCallback();
- JobDetail job = JobBuilder.newJob(MyProducer.class).build();
- Trigger trigger=TriggerBuilder.newTrigger().withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
- try {
- Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
- scheduler.scheduleJob(job,trigger);
- scheduler.start();
- }catch (SchedulerException e){
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public class producer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- Properties props = new Properties();
- props.put("bootstrap.servers", "hadoop01:9092");// Kafka服务端的主机名和端口号
- props.put("acks", "all"); // 等待所有副本节点的应答
- props.put("retries", 3);// 消息发送最大尝试次数
- props.put("batch.size", 16384); // 一批消息处理大小
- props.put("linger.ms", 1); // 请求延时
- props.put("buffer.memory", 33554432); // 发送缓存区内存大小
- // key序列化
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // value序列化
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<>(props);
- for (int i = 0; i < 50; i++) {
- //创建生产者无回调函数
- producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
- //创建生产者有回调函数
- producer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (metadata != null) {
- System.err.println(metadata.partition() + "---" + metadata.offset());
- }
- }
- });
- }
- producer.close();
- }
(每个分区都是一个完全有序的日志,但是分区之间没有全局排序(除了你的消息中可能包含的一些挂钟时间)。将消息分配给特定分区是可由编写者控制的,大多数用户选择通过某种密钥(例如用户id,或 订单ID)进行分区。分区允许在没有分片之间协调的情况下发生日志追加,并允许系统的吞吐量与Kafka群集大小线性地扩展。)
1.一个topic 一个分区 虽然保证全局有序,但是性能下降
- public class MyPartition implements Partitioner {
- private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
- @Override
- public void configure(Map<String, ?> configs) {
- }
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // TODO Auto-generated method stub
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- int partitionNum = 0;
- try {
- partitionNum = Integer.parseInt((String) key);
- } catch (Exception e) {
- partitionNum = key.hashCode();
- }
- LOG.info("the message sendTo topic:" + topic + " and the partitionNum:" + partitionNum);
- return Math.abs(partitionNum % numPartitions);
- }
- @Override
- public void close() {
- }
- }
- /*
- INFO | the message sendTo topic:Topic-test and the partitionNum:42
- INFO | the message sendTo topic:Topic-test and the partitionNum:43
- INFO | the message sendTo topic:Topic-test and the partitionNum:44
- INFO | the message sendTo topic:Topic-test and the partitionNum:45 */
- public class Producer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", ",,");// 该地址是集群的子集,用来探测集群。
- props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
- props.put("retries", 3);// 请求失败重试的次数
- props.put("batch.size", 16384);// batch的大小
- props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
- props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式,
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 设置属性 自定义分区类
- props.put("partitioner.class", "com.east.spark.kafka.MyPartition");
- KafkaProducer<String, String> producer = new KafkaProducer(props);
- for (int i = 0; i < 10000; i++) {
- // 三个参数分别为topic, key,value,send()是异步的,添加到缓冲区立即返回,更高效。
- producer.send(new ProducerRecord<String, String>("Topic-test", Integer.toString(i), Integer.toString(i)));
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- producer.close();
- }
- }
- String kafkas = ",,";
- Properties props = new Properties();
- //kafka连接信息
- props.put("bootstrap.servers",kafkas);
- //消费者组id
- props.put("group.id", "test_group");
- //是否自动提交offset
- props.put("enable.auto.commit", "true");
- //在没有offset的情况下采取的拉取策略
- props.put("auto.offset.reset", "none");
- //自动提交时间间隔
- props.put("auto.commit.interval.ms", "1000");
- //key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象
- //key反序列化
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- //value反序列化
- props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
- kafkaConsumer.subscribe(Arrays.asList("eshop"));
- try {
- while (true) { //1)
- ConsumerRecords<String, String> records = consumer.poll(100); // timeout
- for (ConsumerRecord<String, String> record : records) //3)
- {
- //poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
- log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
- record.topic(), record.partition(), record.offset(),record.key(), record.value());
- //相应处理逻辑
- int updatedCount = 1;
- if (custCountryMap.countainsValue(record.value())) {
- updatedCount = custCountryMap.get(record.value()) + 1;
- }
- custCountryMap.put(record.value(), updatedCount)
- JSONObject json = new JSONObject(custCountryMap);
- System.out.println(json.toString(4))
- }
- }
- //offset自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
- } finally {
- consumer.close(); //4主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。
- }
- public class Consumer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", ",,");// 该地址是集群的子集,用来探测集群。
- props.put("group.id", "test");// cousumer的分组id
- props.put("enable.auto.commit", "true");// 自动提交offsets
- props.put("auto.commit.interval.ms", "1000");// 每隔1s,自动提交offsets
- props.put("session.timeout.ms", "30000");// Consumer向集群发送自己的心跳,超时则认为Consumer已经死了,kafka会把它的分区分配给其他进程
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
- consumer.subscribe(Arrays.asList("Topic-test"));// 订阅的topic,可以多个
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("partition = %d , offset = %d, key = %s, value = %s", record.partition(),
- record.offset(), record.key(), record.value());
- System.out.println();
- }
- }
- }
- }
- /*
- partition = 1 , offset = 48, key = 1, value = 1
- partition = 2 , offset = 41, key = 2, value = 2
- partition = 0 , offset = 51, key = 3, value = 3
- partition = 1 , offset = 49, key = 4, value = 4
- partition = 2 , offset = 42, key = 5, value = 5
- partition = 0 , offset = 52, key = 6, value = 6
- partition = 1 , offset = 50, key = 7, value = 7
- partition = 2 , offset = 43, key = 8, value = 8
- partition = 0 , offset = 53, key = 9, value = 9
- partition = 1 , offset = 51, key = 10, value = 10
- partition = 2 , offset = 44, key = 11, value = 11
- */
- consumer.subscribe(Arrays.asList("topic"));//主题
- //上面这行命令换成下面这四句
- String topic = "topic";
- TopicPartition partition0 = new TopicPartition(topic, 0);//主题,分区0
- TopicPartition partition1 = new TopicPartition(topic, 1);//主题,分区1
- consumer.assign(Arrays.asList(partition0, partition1));
注意:手动分配分区(assgin)和 动态分区分配的订阅topic模式(subcribe)不能混合使用。
- Properties prop = new Properties();
- prop.put("bootstrap.servers", "mini1:9092,mini2:9092,mini3:9092");//指定节点地址
- prop.put("group.id", "001");
- prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- prop.put("consumer-id", "test");
- Consumer<String, String> consumer = new KafkaConsumer<>(prop);//消费者
- TopicPartition p = new TopicPartition("test6", 2);//只消费分区号为2的分区
- consumer.assign(Arrays.asList(p));
- // consumer.subscribe(Arrays.asList("test6"));//消费topic 消费全部分区
- // subscribe to some partitions of topic foo
- TopicPartition partition0 = new TopicPartition("foo", 0);
- TopicPartition partition1 = new TopicPartition("foo", 1);
- TopicPartition[] partitions = new TopicPartition[2];
- partitions[0] = partition0;
- partitions[1] = partition1;
- consumer.assign(Arrays.asList(partitions ));
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));//消费一定时间的数据
- Thread.sleep(3000);
- System.out.println("循环");
- for (ConsumerRecord<String, String> record : poll) {
- System.out.println(String.format("key:%s , value:%s , offset:%s", record.key(), record.value(), record.offset()));
- }
- }
- @KafkaListener(topicPartitions = {@TopicPartition(topic = "mayikt", partitions = {"0"})})
- public void receive(ConsumerRecord<?, ?> consumer) {
- System.out.println("topic名称:" + consumer.topic() + ",key:" +
- consumer.key() + "," +
- "分区位置:" + consumer.partition()
- + ", 下标" + consumer.offset());
- }
- package com.kafka.singleton;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
- import java.util.Random;
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public final class KafkaProducerSingleton {
- private static final Logger LOGGER = LoggerFactory .getLogger(KafkaProducerSingleton.class);
- private static KafkaProducer<String, String> kafkaProducer;
- private Random random = new Random();
- private String topic;
- private int retry;
- private KafkaProducerSingleton() {
- }
- /**
- * 静态内部类
- */
- private static class LazyHandler {
- private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
- }
- /**
- * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
- */
- public static final KafkaProducerSingleton getInstance() {
- return LazyHandler.instance;
- }
- /**
- * kafka生产者进行初始化
- */
- public void init(String topic,int retry) {
- this.topic = topic;
- this.retry = retry;
- if (null == kafkaProducer) {
- Properties props = new Properties();
- InputStream inStream = null;
- try {
- inStream = this.getClass().getClassLoader()
- .getResourceAsStream("kafka.properties");
- props.load(inStream);
- kafkaProducer = new KafkaProducer<String, String>(props);
- } catch (IOException e) {
- LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
- } finally {
- if (null != inStream) {
- try {
- inStream.close();
- } catch (IOException e) {
- LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
- }
- }
- }
- }
- }
- /**
- * 通过kafkaProducer发送消息
- * @param topic 消息接收主题
- * @param partitionNum 哪一个分区
- * @param retry 重试次数
- * @param message 具体消息值
- */
- public void sendKafkaMessage(final String message) {
- /**
- * 1、如果指定了某个分区,会只讲消息发到这个分区上
- * 2、如果同时指定了某个分区和key,则也会将消息发送到指定分区上,key不起作用
- * 3、如果没有指定分区和key,那么将会随机发送到topic的分区中
- * 4、如果指定了key,那么将会以hash<key>的方式发送到分区中
- */
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(
- topic, random.nextInt(3), "", message);
- // send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率
- // kafka生产者是线程安全的,可以单实例发送消息
- kafkaProducer.send(record, new Callback() {
- public void onCompletion(RecordMetadata recordMetadata,
- Exception exception) {
- if (null != exception) {
- LOGGER.error("kafka发送消息失败:" + exception.getMessage(),
- exception);
- retryKakfaMessage(message);
- }
- }
- });
- }
- /**
- * 当kafka消息发送失败后,重试
- */
- private void retryKakfaMessage(final String retryMessage) {
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(
- topic, random.nextInt(3), "", retryMessage);
- for (int i = 1; i <= retry; i++) {
- try {
- kafkaProducer.send(record);
- return;
- } catch (Exception e) {
- LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
- retryKakfaMessage(retryMessage);
- }
- }
- }
- /**
- * kafka实例销毁
- */
- public void close() {
- if (null != kafkaProducer) {
- kafkaProducer.close();
- }
- }
- public String getTopic() {
- return topic;
- }
- public void setTopic(String topic) {
- this.topic = topic;
- }
- public int getRetry() {
- return retry;
- }
- public void setRetry(int retry) {
- this.retry = retry;
- }
- }
- package com.travelsky.kafka.singleton;
- public class HandlerProducer implements Runnable {
- private String message;
- public HandlerProducer(String message) {
- this.message = message;
- }
- @Override
- public void run() {
- KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton
- .getInstance();
- kafkaProducerSingleton.init("test_find",3);
- System.out.println("当前线程:" + Thread.currentThread().getName()
- + ",获取的kafka实例:" + kafkaProducerSingleton);
- kafkaProducerSingleton.sendKafkaMessage("发送消息" + message);
- }
- }
- bootstrap.servers=master:9092,slave1:9092,slave2:9092
- acks=1
- retries=0
- batch.size=1000
- compression.type=gzip
- #buffer.memory=33554432
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
- public static void main(String[] args) {
- Kafka_Consumer kafka_Consumer = new Kafka_Consumer();
- try {
- kafka_Consumer.execute();
- Thread.sleep(20000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- kafka_Consumer.shutdown();
- }
- }
- package com.kafka.consumer;
- import java.util.Arrays;
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- public final class Kafka_Consumer {
- /**
- * kafka消费者不是线程安全的
- */
- private final KafkaConsumer<String, String> consumer;
- private ExecutorService executorService;
- public Kafka_Consumer() {
- Properties props = new Properties();
- props.put("bootstrap.servers", "ip,port");
- props.put("group.id", "group");
- // 关闭自动提交
- props.put("enable.auto.commit", "false");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- consumer = new KafkaConsumer<String, String>(props);
- consumer.subscribe(Arrays.asList("test_find"));
- }
- public void execute() {
- executorService = Executors.newFixedThreadPool(3);
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(10);
- if (null != records) {
- executorService.submit(new ConsumerThread(records, consumer));
- }
- }
- }
- public void shutdown() {
- try {
- if (consumer != null) {
- consumer.close();
- }
- if (executorService != null) {
- executorService.shutdown();
- }
- if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
- System.out.println("Timeout");
- }
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
- package com.kafka.consumer;
- import java.util.Collections;
- import java.util.List;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.consumer.OffsetAndMetadata;
- import org.apache.kafka.common.TopicPartition;
- /**
- * 多消费者,多个work线程,难保证分区消息消费的顺序性
- */
- public final class ConsumerThread implements Runnable {
- private ConsumerRecords<String, String> records;
- private KafkaConsumer<String, String> consumer;
- public ConsumerThread(ConsumerRecords<String, String> records,
- KafkaConsumer<String, String> consumer) {
- this.records = records;
- this.consumer = consumer;
- }
- @Override
- public void run() {
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, String>> partitionRecords = records
- .records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- System.out.println("当前线程:" + Thread.currentThread() + ","
- + "偏移量:" + record.offset() + "," + "主题:"
- + record.topic() + "," + "分区:" + record.partition()
- + "," + "获取的消息:" + record.value());
- }
- // 消费者自己手动提交消费的offest,确保消息正确处理后再提交
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1)
- .offset();
- consumer.commitSync(Collections.singletonMap(partition,
- new OffsetAndMetadata(lastOffset + 1)));
- }
- }
- }
- import com.travelsky.kafka.singleton.HandlerProducer;
- @RunWith(SpringJUnit4ClassRunner.class)
- @ContextConfiguration(locations = { "classpath:applicationContext.xml" })
- public class Kafka生产_多线程单实例 {
- @Test
- public void testSendMessageSingleton() throws InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(3);
- for (int i = 1; i <= 10; i++) {
- Thread.sleep(1000);
- executor.submit(new HandlerProducer(":" + i));
- }
- }
- }
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:44,主题:test_find,分区:1,获取的消息:发送消息:1
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:45,主题:test_find,分区:1,获取的消息:发送消息:2
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:46,主题:test_find,分区:1,获取的消息:发送消息:3
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:39,主题:test_find,分区:0,获取的消息:发送消息:4
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:47,主题:test_find,分区:1,获取的消息:发送消息:5
- 当前线程:Thread[pool-1-thread-3,5,main],偏移量:40,主题:test_find,分区:0,获取的消息:发送消息:6
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:37,主题:test_find,分区:2,获取的消息:发送消息:7
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:38,主题:test_find,分区:2,获取的消息:发送消息:8
- 当前线程:Thread[pool-1-thread-1,5,main],偏移量:48,主题:test_find,分区:1,获取的消息:发送消息:9
- 当前线程:Thread[pool-1-thread-2,5,main],偏移量:39,主题:test_find,分区:2,获取的消息:发送消息:10
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-2,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-3,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
- 当前线程:pool-1-thread-1,获取的kafka实例:com.kafka.singleton.KafkaProducerSingleton@15eb475
kafka的消费者(KafkaConsumer对象)并不是线程安全的。客户端代码需要自己确保多线程的访问是同步的。 唯一例外的是wakeup方法(是线程安全的):它可以被外部线程用来安全地中断一个进行中的操作。对于阻塞在wakeup方法上的线程会抛出WakeupException。可以被另外的线程用来作为关闭consumer的钩子。
- public class KafkaConsumerRunner implements Runnable {
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final KafkaConsumer consumer;
- public void run() {
- try {
- consumer.subscribe("topic");
- while (!closed.get()) {
- ConsumerRecords records = consumer.poll(10000);
- // 处理新的记录
- }
- } catch (WakeupException e) {
- if (!closed.get()) throw e; //如果关闭了忽略异常
- } finally {
- consumer.close();
- }
- }
- // 关闭钩子,可以在另一个线程中调用
- public void shutdown() {
- closed.set(true);
- consumer.wakeup();
- }
- }
一个是committed(TopicPartition partition)方法,这个方法返回一个OffsetAndMetadata对象,通过这个对象可以获取指定分区已提交的偏移量;
- TopicPartition topicPartition = new TopicPartition(topic, 0);
- OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
- long offset = offsetAndMetadata.offset();
另外一个方法position(TopicPartition partition)返回的是下一次拉取位置。
long position = consumer.position(topicPartition);
1.seek(TopicPartition partition, long offset):将消费者起始位置重置到指定偏移量位置
偏移量提交有自动和手动,默认是自动(enable.auto.commit = true)。自动提交的话每隔多久自动提交一次呢?这个由消费者协调器参数auto.commit.interval.ms 毫秒执行一次提交。有些场景我们需要手动提交偏移量,尤其是在一个长事务中并且保证消息不被重复消费以及消息不丢失,比如生产者一个订单提交消息,消费者拿到后要扣减库存,扣减成功后该消息才能提交,所以在这种场景下需要手动提交,因为库存扣减失败这个消息就不能消费,同时客户这个订单状态也不能是成功。手动提交也有两种一个是同步提交一个是异步提交,其区别就是消费者线程是否阻塞。如果使用手动提交就要关闭自动提交,因为自动提交默认是开启的。
- // 配置信息
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- // 创建消费者实例, 并且订阅topic
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- // 消费者消费消息
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records)
- System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
- }
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records){
- System.out.printf("topic = %s, partition = %s,
- offset = %d, customer = %s, country = %s\n",
- record.topic(), record.partition(), record.offset(),
- record.key(), record.value());
- }
- consumer.commitAsync();
- }
- props.put("enable.auto.commit", "false"); // 设置autoCommit为false
- int commitInterval = 200;
- List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- buffer.add(record);
- if (buffer.size() >= commitInterval) {
- insertIntoDb(buffer);
- consumer.commitSync();
- buffer.clear();
- }
- }
- }
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records){
- System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
- }
- try {
- consumer.commitSync();
- } catch (CommitFailedException e) {
- log.error("commit failed", e)
- }
- }
由于异步提交不会等消费偏移量提交成功后再拉取下一次消息,因此异步提交提供了一个偏移量提交回调方法commitAsync(OffsetCommitCallback callback)。提交偏移量完成之后会回调OffsetCommitCallback接口的onComplete()方法
- try{
- //最少处理100条
- int minCommitSize = 100;
- //定义计数器
- int icount = 0;
- // 4、获取数据
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
- icount++;
- }
- Thread.sleep(5000);
- //在业务逻辑处理成功后提交offset
- if(icount >= minCommitSize){
- //满足最少消费100条,再进行异步提交
- consumer.commitAsync(new OffsetCommitCallback() {
- @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if(exception == null){
- System.out.println("commit success");
- }else {
- System.out.println("commit failed"); //提交失败,对应处理
- }
- }
- });
- //计数器归零
- icount = 0 ;
- }
- }
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("topic = %s, partition = %s, offset = %d,
- customer = %s, country = %s\n",
- record.topic(), record.partition(),
- record.offset(), record.key(), record.value());
- }
- consumer.commitAsync();
- }
- } catch (Exception e) {
- log.error("Unexpected error", e);
- } finally {
- try {
- consumer.commitSync();
- } finally {
- consumer.close();
- }
- }
今天遇到一个kafka的问题,在生产者发送消息之后,消费者会消费多次。在网上查询了很久,最终是在这个博客的引导下发现了问题 ,里面提到了kafka中的配置enable.auto.commit 是 true,这个会自动提交,然后是当我们的配置是自动提交的时候,消费者的消息投递保证有可能是at least once,或者at most once。当到达提交时间间隔,触发Kafka自动提交上次的偏移量时,就可能发生at most once的情况, 在这段时间,如果消费者还没完成消息的处理进程就崩溃了, 消费者进程重新启动时,它开始接收上次提交的偏移量之后的消息,实际上消费者可能会丢失几条消息;而当消费者处理完消息并将消息提交到持久化存储系统,而消费者进程崩溃时,会发生at least once的情况。 在此期间,kafka没有向broker提交offset,因为自动提交时间间隔没有过去。 当消费者进程重新启动时,会收到从上次提交的偏移量开始的一些旧消息。正是这个导致消息丢失或者重复消费现象。
在一些情况下,即使消费者进程没有崩溃,假如中间有一个消息的业务逻辑执行抛出了异常,消费者也当作是接收到了消息,程序执行回滚,这条消息也等同于丢失了。我关闭了自动提交(enable.auto.commit:false),当消费者每次 poll 处理完业务逻辑后必须完成手动同步提交(commitSync),如果消费者在消费过程中发生 crash,或者执行业务逻辑发生异常回滚,下次启动时依然会从之前的位置开始消费,从而保证每次提交的内容都能被消费,即实现了at least once保证。
- @StreamListener(MySink.INPUTB)
- public void messageListen(JSONObject message) {
- //下面是具体的消息消费事件
- int result = doSoming();
- if (result<0){
- System.out.println("service消息内容:解析失败");
- }else {
- System.out.println("service消息内容:解析成功");
- }
- }
- @KafkaListener(topics = {"${kafka.topic.topicB}"}, groupId = "groupB")
- public void consumeTopicB(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
- Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
- if(kafkaMessage.isPresent()) {
- Object message = kafkaMessage.get();
- /*
- * 执行消费逻辑处理的代码,
- */
- acknowledgment.acknowledge();// 消费成功后手动提交offset
- logger.info("消费者B消费topicB:{} partition:{}的消息 -> {}", consumerRecord.topic(), consumerRecord.partition(),message);
- }
- public class KafkaMsg implements Serializable {
- private static final long serialVersionUID = -1532915942422600087L;
- private String msgId;
- private String content;
- private Integer retryTime; // 重试次数记录
- public String getMsgId() {
- return msgId;
- }
- public String getContent() {
- return content;
- }
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
- public void setContent(String content) {
- this.content = content;
- }
- public Integer getRetryTime() {
- return retryTime;
- }
- public Integer setRetryTime(Integer time) {
- this.retryTime = time;
- }
- @Override
- public String toString() {
- return "KafkaMsg{" +
- "msgId='" + msgId + '\'' +
- ", content='" + content + '\'' +
- '}';
- }
- }
默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。
自定义的 SeekToCurrentErrorHandler
在实际开发中,我们可以通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理。例如设置重试次数,在达到最大重试次数仍然失败后保存消息到数据库后续做人工处理。
SeekToCurrentErrorHandler可以针对消息的单条消费失败的消费重试处理。在消息消费失败时,SeekToCurrentErrorHandler 会将调用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。同时,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。另外,在 FailedRecordTracker 中,会调用 BackOff 来进行计算,该消息的下一次重新消费的时间,通过 Thread#sleep(...) 方法,实现重新消费的时间间隔。
但是,FailedRecordTracker 提供的计数是客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下 FailedRecordTracker 类,通过 ZooKeeper 存储计数。
- public class SeekToCurrentErrorHandler extends FailedRecordProcessor implements ContainerAwareErrorHandler {
- private boolean ackAfterHandle;
- //默认是失败后重试9次,没有BiConsumer接口的实现也就是没有后续处理逻辑
- public SeekToCurrentErrorHandler() {
- this((BiConsumer)null, SeekUtils.DEFAULT_BACK_OFF);//DEFAULT_BACK_OFF = new FixedBackOff(0L, 9L)
- }
- public SeekToCurrentErrorHandler(BackOff backOff) {
- this((BiConsumer)null, backOff);
- }
- public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
- this(recoverer, SeekUtils.DEFAULT_BACK_OFF);//DEFAULT_BACK_OFF = new FixedBackOff(0L, 9L)
- }
- public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
- super(recoverer, backOff);
- this.ackAfterHandle = true;
- }
- public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
- SeekUtils.seekOrRecover(thrownException, records, consumer, container, this.isCommitRecovered(), this.getSkipPredicate(records, thrownException), this.logger, this.getLogLevel());
- }
- }
- @Configuration
- public class KafkaConfiguration {
- @Bean
- @Primary
- public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
- SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(new BiConsumer<ConsumerRecord<?, ?>, Exception>() {
- @Override
- public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
- //重试4次仍然失败后会进去BiConsumer接口的accept方法,可以进行保存数据库等操作
- log.info("消费失败4次,消息保存到数据库. record:{}",JSON.toJSONString(record));
- }
- }, new FixedBackOff(0L, 4L));//interval 为0标识立即重试,maxAttempts为4标识最多重试四次
- return errorHandler;
- }
- }
