赞
踩
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
② 在IDEA中执行代码,观察hadoop102上的消费者消费情况
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu1
kafka2
……
③ 观察IDEA中控制台输出
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
3)案例2:没有指明partition但是有key的情况下的消费者分区分配
package com.atguigu.kafka.producer; import org.apache.kafka.clients.producer.\*; import java.util.Properties; public class CustomProducerCallbackKey { public static void main(String[] args) { // 1. 创建配置对象 Properties properties = new Properties(); // 2. 配置属性 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 3. 创建生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 4. 造数据 for (int i = 1; i < 11; i++) { // 创建producerRecord对象 final ProducerRecord<String, String> producerRecord = new ProducerRecord<>( "first", i + "",// 依次指定key值为i "atguigu " + i); kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e == null){ System.out.println("消息:"+producerRecord.value()+", 主题:" + metadata.topic() + "->" + "分区:" + metadata.partition() ); }else { e.printStackTrace(); } } }); } kafkaProducer.close(); } }
4)测试
观察IDEA中控制台输出
消息:atguigu 1, 主题:first->分区:0
消息:atguigu 5, 主题:first->分区:0
消息:atguigu 7, 主题:first->分区:0
消息:atguigu 8, 主题:first->分区:0
消息:atguigu 2, 主题:first->分区:2
消息:atguigu 3, 主题:first->分区:2
消息:atguigu 9, 主题:first->分区:2
消息:atguigu 4, 主题:first->分区:1
消息:atguigu 6, 主题:first->分区:1
消息:atguigu 10, 主题:first->分区:1
package com.atguigu.kafka.partitioner; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /\*\* \* @author leon \* @create 2020-12-11 10:43 \* 1. 实现接口Partitioner \* 2. 实现3个方法:partition,close,configure \* 3. 编写partition方法,返回分区号 \*/ public class MyPartitioner implements Partitioner { /\*\* \* 分区方法 \*\*/ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 1. 获取key String keyStr = key.toString(); // 2. 创建分区号,返回的结果 int partNum; // 3. 计算key的hash值 int keyStrHash = keyStr.hashCode(); // 4. 获取topic的分区个数 int partitionNumber = cluster.partitionCountForTopic(topic); // 5. 计算分区号 partNum = Math.abs(keyStrHash) % partitionNumber; // 4. 返回分区号 return partNum; } // 关闭资源 @Override public void close() { } // 配置方法 @Override public void configure(Map<String, ?> configs) { } }
// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," com.atguigu.kafka.partitioner.MyPartitioner ");
在hadoop102上启动kafka消费者
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
在IDEA中观察回调信息
消息:atguigu 2, 主题:first->分区:2
消息:atguigu 5, 主题:first->分区:2
消息:atguigu 8, 主题:first->分区:2
消息:atguigu 1, 主题:first->分区:1
消息:atguigu 4, 主题:first->分区:1
消息:atguigu 7, 主题:first->分区:1
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
,那么很难做到真正的技术提升。**
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。