赞
踩
前置:熟悉javase,熟悉linux,熟悉idea,熟悉hadoop
前端埋点记录用户(浏览,点赞,收藏,评论)到日志服务器,然后通过Flume(小于100m/s)将大日志文件导入到Hadoop集群,每产生一个日志就发送到hadoop(上传100m/s)中。
秒杀活动:Flume采集速度大于200ms/s,就需要KafKa集群。
Kafka传统定义:一个分布式的基于发布/订阅的消息队列(MessageQueue),主要用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义:一个开源的分布式事件流平台(EventStreamingPlatform),用于高性能数据管道,流分析,数据集成和关键任务应用。
常见消息队列:Kafka,ActiveMQ,RabbitMQ,RocketMQ,大数据场景主要采用Kafka。
缓存/削峰,解耦和异步通信。
缓冲/削峰:控制和优化数据流速度,解决生产和消费消息处理速度不一致的情况。
解耦:独立的扩展或修改两边的处理过程,只要确保他们遵循同样的接口约束。
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后再需要的时候再去处理。
hadoop102 | hadoop103 | hadoop104 |
---|---|---|
ZK | ZK | ZK |
Kafka | Kafka | Kafka |
注意:(在server.properties中修改)每个kafka在集群中的broker.id一定要唯一,修改log.dirs地址,修改zookeeper.connect地址
注意:一定要先关kafka,再关zookeeper
--kf.sh #!/bin/bash case $1 in "start") for i in hadoop1 hadoop2 hadoop3 do echo "---启动 $i kafka---" ssh $i"/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done ;; "stop") for i in hadoop1 hadoop2 hadoop3 do echo "---启动 $i kafka---" ssh $i"/opt/module/kafka/bin/kafka-server-stop.sh" done ;; esac
chomod 777
进入kafka目录下,不用进入bin目录下 -- 查看所有topic .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list -- 查看指定topic信息 .\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic test -- 创建topic信息 .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test -- 创建生产者产生消息,不关闭页面 .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test -- 创建消费者接收消息,不关闭页面 .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning -- 删除topic: .\bin\windows\kafka-topics.bat kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --delete --topic test
生产者发送:bin/kafka-console-producer.sh --bootstrap-server localhost:9092 -topic first
消费者消费:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic first
查看历史数据:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic first-beginning
在消息发送过程中有2线程,main和sender。main中创建一个双端队列RecordAccumulator。main将消息发给队列,Sender线程从中拉取消息发送到Kafka Broker。
DQueue中数据发给Send条件:
public class CustomProducer { public static void main(String[] args) { //0.配置 Properties properties = new Properties(); //连接kafka properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); //指定对应的ke和value序列化类型 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //1.创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //2.发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i)); } //3.关闭资源 kafkaProducer.close(); } }
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数:元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明发送成功。
只需在send()方法上添加callback参数即可。
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
}
}
);
只需在异步发送的基础上,再调用get()方法即可。
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i)).get();
如果根据企业需求,自己实现分区器。
/** * @author 自定义分区器 */ public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取数据 String msgValue = value.toString(); int partition; if (msgValue.contains("x")) { partition = 0; } else { partition = 1; } return partition; } @Override public void close() {} @Override public void configure(Map<String, ?> map) {} } //使用:关联自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com/xuyu/kafka/producer/config/MyPartitioner.java");
从生产者到kafka集群broker仓库本来一次只发送一个data包数据,修改batch.size(批次大小,默认16k),linger.ms(等待时间,修改为5-100ms)使等待时间和发送数据量增大。同时使用compression.type(压缩snappy)将数据压缩。并将RecordAccumulator(缓冲区大小,修改为64m)调大。
public class CustomProducerParameters { public static void main(String[] args) { //0. 配置 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //连接/集群 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //key序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //value序列化 //优化参数 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //缓冲区大小,默认32M properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //批次大小,默认16K properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); //linger.ms 等待时间,默认0 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); //压缩,默认none,可选gzip,snappy,lz4,zstd //1. 创建生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //2. 发送 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i)); } //3. 关闭 kafkaProducer.close(); } }
//acks
properties.put(ProducerConfig.ACKS_CONFIG, "1"); //ack参数,默认-1 可选0,1,-1
properties.put(ProducerConfig.RETRIES_CONFIG, 3); //重试次数,默认int最大值21亿
数据重复分析:生产者发送过来的数据,Leader和ISR队列中所有节点收到数据后,leader挂了,没有返回ack,选出新的leader重复收到相同的数据。解决见下文。
public class CustomProducerTransactions { public static void main(String[] args) { //0.配置 Properties properties = new Properties(); //连接kafka properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //指定对应的ke和value序列化类型 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_1"); //指定事务id //1.创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //添加事务 kafkaProducer.initTransactions(); kafkaProducer.beginTransaction(); try { //2.发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i)); } kafkaProducer.commitTransaction(); } catch (Exception e) { kafkaProducer.abortTransaction(); } finally { //3.关闭资源 kafkaProducer.close(); } } }
单分区内有序:有条件,见下文
多分区,分区与分区间无序。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。