赞
踩
// 创建消费者的配置信息
Properties props = new Properties();
bootstrap.servers
:连接的集群,broker集群地址,格式:ip1:port,ip2:port…,不需要设定全部的集群地址,设置两个或者两个以上即可。
props.put("bootstrap.servers", "192.168.1.129:9092,192.168.1.133:9092,192.168.1.134:9092");// 连接的集群
group.id
:消费者组,消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。
props.put("group.id", "test");// 消费者组
enable.auto.commit
:是否自动提交,boolean 类型,配置是否开启自动提交消费位移的功能,默认开启
props.put("enable.auto.commit", "false");// 不自动提交
auto.commit.interval.ms
:自动提交的延迟,当enbale.auto.commit参数设置为 true 时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔
props.put("auto.commit.interval.ms", "1000");// 自动提交的延迟
key.deserializer和value.deserializer
:反序列化 ,broker接收消息必须以字节数组byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的类型。key.serializer和value.serializer分别用来指定key和value序列化操作的序列化器,无默认值。类的全限定名。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化
fetch.min.bytes
:该参数用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量,默认值为1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。
fetch.max.bytes
:该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800(B),也就是50MB。
如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成无法消费呢?该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes(对应于主题端参数 max.message.bytes)来设置。
fetch.max.wait.ms
:这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待500ms。这个参数的设定和 Consumer 与 Kafka 之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。
max.partition.fetch.bytes
:这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为1048576(B),即1MB。这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。
max.poll.records
:一次性拉取的条数,这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
props.put("max.poll.records",10);// 一次性最大拉取的条数
connections.max.idle.ms
:这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。
exclude.internal.topics
:Kafka 中有两个内部的主题: __consumer_offsets 和 __transaction_state。exclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。
receive.buffer.bytes
:这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果 Consumer 与 Kafka 处于不同的机房,则可以适当调大这个参数值。
send.buffer.bytes
:这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms
:这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为30000(ms)。
metadata.max.age.ms
:这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入
reconnect.backoff.ms
:这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向 broker 发送的所有请求。
auto.offset.reset
:在 Kafka 中,每当消费者组内的消费者查找不到所记录的消费位移或发生位移越界时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为 “latest” 。
值可以为 earliest、latest 和 none 。关于 earliest 和 latest 的解释,官方描述的太简单,各含义在真实情况如下所示:
earliest :当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费。
latest :当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费该分区下新产生的数据。
none :topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
partition.assignment.strategy
:消费者的分区分配策略
interceptor.class
:用来配置消费者客户端的拦截器
session.timeout.ms
:超时时间
props.put("session.timeout.ms", "30000");
public static void main(String[] args) { // 创建消费者的配置信息 Properties props = new Properties(); // 给配置信息赋值session.timeout.ms props.put("bootstrap.servers", "192.168.1.129:9092,192.168.1.133:9092,192.168.1.134:9092");// 连接的集群 props.put("group.id", "XXXX");// 消费者组 集群中配置的 props.put("enable.auto.commit", "false");// 自动提交 props.put("auto.commit.interval.ms", "1000");// 自动提交的延迟 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化 // 创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题--可以定义多个主题 String[] arr = { "test" }; consumer.subscribe(Arrays.asList(arr)); // 获取数据拉取 ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("offset =" + record.offset() + " key =" + record.key() + " value"+ record.value()); // 数据处理完之后手动提交 consumer.commitAsync(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。