当前位置:   article > 正文

java Kafka生产者推送数据与消费者接收数据(参数配置以及案例)_java实现kafka消息发送和接收

java实现kafka消息发送和接收

1.生产者推送数据

常用参数

bootstrap.servers:Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。

acks:Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认消息;all表示生产者会等待所有副本都确认消息。确认模式越高,可靠性越高,但延迟也越大。

retries:消息发送失败时的重试次数。默认值为0,表示不进行重试。可以将其设置为大于0的值,例如3,表示最多重试3次。

batch.size:消息批量发送的大小。当生产者累积到一定数量的消息时,会将其打包成一个批次一次性发送给Broker。默认值为16384字节,即16KB。

linger.ms:消息发送的延迟时间。生产者会等待一定的时间,以便将更多的消息打包成一个批次一次性发送给Broker。默认值为0,表示立即发送。设置较大的值可以提高吞吐量,但可能会增加消息的延迟。

buffer.memory:生产者可用于缓存消息的内存大小。默认值为33554432字节,即32MB。如果生产者生产消息的速度快于发送消息的速度,可能会导致缓存溢出。可以调整该参数来适应生产者的生产速度。

key.serializer:Key的序列化器。Kafka消息可以包含Key和Value,Key和Value都需要进行序列化。该参数指定Key的序列化器。

value.serializer:Value的序列化器。该参数指定Value的序列化器。

max.block.ms:生产者在发送消息之前等待Broker元数据信息的最长时间。如果在该时间内无法获取到Broker元数据信息,则会抛出TimeoutException异常。默认值为60000毫秒,即60秒。

compression.type:消息压缩类型。可选值为none、gzip、snappy、lz4。默认值为none,表示不进行压缩。压缩可以减少消息的传输大小,提高网络带宽的利用率,但会增加CPU的消耗。

interceptor.classes:消息拦截器列表。可以指定多个消息拦截器对消息进行加工处理。例如,可以在消息中添加时间戳、添加消息来源等信息。
以上参数只是一部分,Kafka生产者还有更多参数可以进行配置。需要根据实际情况选择合适的参数进行配置。

例子

下面是一个单例模式配置 kafka生产者的例子(避免多次创建实例,减少资源的消耗)

public class SingletonKafkaProducerExample {
    private static SingletonKafkaProducerExample instance;
    private static Producer<String, String> producer;
    private SingletonKafkaProducerExample() {
    //参数设置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:端口");
        props.put("acks", "all");
        props.put("max.block.ms",120000);//默认60s
        props.put("retries", 3)//默认0;
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("request.timeout.ms",60\*1000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
//sasl认证 (根据实际情况看是否配置)
		props.put("security.protocol", "SASL\_PLAINTEXT");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';");
		producer = new KafkaProducer<>(props);
		logger.info("kafka连接成功");
    }
    public static SingletonKafkaProducerExample getInstance() {
        if (instance == null) {
            synchronized (SingletonKafkaProducerExample.class) {
                if (instance == null) {
                    instance = new SingletonKafkaProducerExample();
                }
            }
        }
        return instance;
    }
    public void sendMessage(String topic, String key, String value) {
        try {
        //这里也可以不用设置key和partition,例如不设置分区 系统会使用轮询算法自动匹配partition
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
            
            Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("发送消息到" + metadata.topic() + "失败:" + exception.getMessage());
                } else {
                    System.out.println("发送消息到" + metadata.topic() + "成功:partition=" + metadata.partition() + ", offset=" + metadata.offset());
                }
            });
            future.get(); // 等待返回数据
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("发送消息失败:" + e.getMessage());
        } 
    }
    public void closeProducer() {
        producer.close();
    }
}



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

以上参数配置只是案例,实际参数配置需要根据业务情况自己设置
下面是生产的方法介绍:

close(): 关闭生产者,释放相关资源。
close(Duration timeout): 在指定的超时时间内关闭生产者,释放相关资源。
initTransactions(): 初始化事务,启用事务支持。
beginTransaction(): 开始事务。
send(ProducerRecord<K, V> record): 发送一条消息记录到指定的主题。
send(ProducerRecord<K, V> record, Callback callback): 发送一条消息记录,并附带一个回调函数用于异步处理发送结果。
send(ProducerRecord<K, V> record, ProducerCallback<T> callback): 发送一条消息记录,并使用自定义的回调函数处理发送结果。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId): 将消费者组的偏移量提交给事务。
partitionsFor(String topic): 获取指定主题的分区信息。
metrics(): 获取生产者的度量指标信息。
flush(): 将所有已挂起的消息立即发送到Kafka服务器,等待服务器确认后再返回。
commitTransaction(): 提交当前事务。
abortTransaction(): 中止当前事务。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata): 将消费者组的偏移量和消费者组元数据提交给事务。

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
可能遇见的问题

1.多个topic发送消息的时候总有1.2发送失败 报Failed to update metadata after 60000ms
这种情况出现的原因可能是Kafka集群中Broker的元数据信息还没有被更新到Kafka客户端中,导致Kafka客户端无法连接到指定的Broker。

解决

增加等待时间:可以通过设置max.block.ms属性来增加等待时间
提高重试次数:可以通过设置retries属性来提高重试次数
检查Broker配置
检查网络连接
检查Kafka版本

如果下面3个都没问题,就增加等待时间和重试次数。本人遇到这样的问题解决了

消费者 推送数据

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置消费者参数
		Properties props = new Properties();
		/\*
 bootstrap.servers
 Kafka集群中Broker的地址列表,格式为"hostname:port",例如:"localhost:9092"。可以配置多个Broker,用逗号分隔。
 \*/
		props.put("bootstrap.servers", "ip:port");
		/\*
 group.id
 消费者组的名称,同一个消费者组中的消费者会共享消费消息的责任。例如:"test"。
 \*/
		props.put("group.id", "test");
		/\*
 enable.auto.commit
 是否自动提交偏移量,默认为true。如果为false,则需要手动提交偏移量。
 \*/
		props.put("enable.auto.commit", "true");
		/\*
 session.timeout.ms
 消费者会话超时时间(毫秒),如果消费者在该时间内没有向Kafka Broker发送心跳,则会被认为已经失效。默认10000毫秒。
 \*/
		props.put("session.timeout.ms", "30000");
		/\*
 auto.offset.reset
 如果消费者在初始化时没有指定偏移量或指定的偏移量不存在,则从哪个位置开始消费,默认latest,即从最新的消息开始消费。其他可选值为earliest和none。
 \*/
		props.put("auto.offset.reset", "earliest");
		/\*
 
 key.deserializer
 key的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
 \*/
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		/\*
 value.deserializer
 value的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
 \*/
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		/\*
 max.poll.records
 每次拉取消息的最大记录数,默认500条。
 \*/
		props.put("max.poll.records", "10000");
		/\*
 fetch.min.bytes
 每次拉取的最小字节数,默认1字节。
 
 fetch.max.bytes
 每次拉取的最大字节数,默认52428800字节,即50MB。
 
 fetch.max.wait.ms
 最长等待时间(毫秒),如果在该时间内没有拉取到任何消息,则返回空结果。默认500毫秒。
 \*/
		props.put("fetch.min.bytes", "1024");
		props.put("fetch.max.bytes", "1048576");
		props.put("fetch.max.wait.ms", "500");
		
		/\*
 max.partition.fetch.bytes
 每个分区最大拉取字节数,默认1048576字节,即1MB。
 \*/
		props.put("max.partition.fetch.bytes", "1024");
		/\*
 connections.max.idle.ms
 最大空闲连接时间(毫秒),超过该时间则连接被认为已经过期并关闭。默认540000毫秒,即9分钟。
 \*/
		props.put("connections.max.idle.ms", "540000");
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/839398
推荐阅读
相关标签
  

闽ICP备14008679号