当前位置:   article > 正文

Kafka生产和消费的代码例子、Kafka集群下生产者代码和发送数据的协议、与RocketMQ区别_kafka消费者代码

kafka消费者代码

借鉴:https://blog.csdn.net/hetry_liang/article/details/110938156
https://blog.csdn.net/u013433591/article/details/128475325

一:Kafka生产和消费的代码例子

1.生产者代码:

package com.njbdqn.mykafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //准备服务器参数
        prop.put("bootstrap.servers","192.168.153.200:9092");
        prop.put("acks","all");
        prop.put("retries","0");
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> producer = new KafkaProducer<>(prop);
        for (int i = 0; i < 10; i++) {
            //循环向kafka中生产10条数据
            ProducerRecord<String, String> rec = new ProducerRecord<>("msg01", ""+i, "test"+i);
            //同步发送数据
            producer.send(rec);
        }
        //关闭资源,kafka才能接收到信息
        producer.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.消费者代码:

package com.njbdqn.mykafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers","192.168.153.200:9092");
        prop.put("group.id","henry5");
        prop.put("enable.auto.commit",true);
        prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        //从消息队列的第一条数据开始拉取
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  

        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);
        //拉取所有topic中的数据
        consumer.subscribe(Arrays.asList("msg01"));
        while (true){
            //每隔1秒拉取一行数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                for(ConsumerRecord<String,String> rec : records){
                    System.out.println(rec.value());
                }
            }
        }
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3.多线程消费代码:(实现Runnable接口的消费类,实例中消费的topic或者分区不同)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MultiThreadConsumer {

    public static void main(String[] args) {
        String brokers = "localhost:9092";
        String topic   = "topic_t40";
        String groupId = "app_q";
        int consumers  = 2;

        for(int i = 0;i < consumers;i++){//也可以线程池
            final ConsumerRunnable consumer = new ConsumerRunnable(brokers, groupId, topic, "thread" + i);
            new Thread(consumer).start();
        }
    }

    static class ConsumerRunnable implements Runnable{

        private final KafkaConsumer<String,String> consumer;

        private volatile boolean isRunning = true;

        private String threadName ;

        public ConsumerRunnable(String brokers,String groupId,String topic,String threadName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));

            this.threadName = threadName;
        }

        @Override
        public void run() {
            try {
                while (isRunning) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                    records.forEach(record -> {
                        System.out.println(this.threadName + " : Message received " + record.value() + ", partition " + record.partition());
                    });
                }
            }finally {
                consumer.close();
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

二:Kafka集群下生产者代码和发送数据的协议

1.集群下生产者代码中的Properties:

// 生产者
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092");
// 指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.生产者发送数据的协议

是自行设计基于TCP之上的二进制协议

三:Kafka与RocketMQ区别

Kafka:单机写入百万/s、不支持定时消息、不支持分布式事务、使用短轮询(实时性差一点)
RocketMQ:单机写入7万/s、支持定时消息、分布式事务、使用长轮询

C ( Consistency )表示强一致性
A ( Availability )表示可用性
P ( Partition Tolerance )表示分区容错性

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/638875
推荐阅读
相关标签
  

闽ICP备14008679号