赞
踩
一、集群启动脚本
1、集群环境:192.168.2.131 (主机名master),192.168.2.130(主机名slave1)
- #!/bin/bash
- zkPath=/home/master/zookeeper-3.4.14/bin
- kafkaPath=/home/master/kafka_2.11-2.2.1
-
- function sshStartZk(){
- echo "登陆主机"$1
- ssh root@$1 << 'EOF'
- cd /home/master/zookeeper-3.4.14/bin
- ./zkServer.sh stop
- ./zkServer.sh start
- echo $1"启动->zookeeper成功"
- exit
- EOF
- }
-
- function sshStartKafka(){
- echo "登陆主机"$1
- ssh root@$1 << 'EOF'
- cd /home/master/kafka_2.11-2.2.1/bin
- ./kafka-server-stop.sh
- ./kafka-server-start.sh ../config/server.properties
- echo "启动->kafka成功"$1
- exit
- EOF
-
- }
-
- echo "开始启动zk"
- sshStartZk "master";
- sshStartZk "slave1";
- echo "启动zk完成"
- echo "开始启动kafka"
- sshStartKafka "master";
- sshStartKafka "slave1";
- echo "启动kafka完成"
二、java的api操作kafka
1、加入maven依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>2.2.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.2.1</version>
- </dependency>
2、调用工具类
- public class KafkaConnectUtils {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConnectUtils.class);
-
- /**
- * 创建生产者
- *
- * @return
- */
- public static KafkaProducer<String, Object> createProducer() {
- Properties props = new Properties();
- // Kafka服务端的主机名和端口号
- props.put("bootstrap.servers", "master:9092");
- // 等待所有副本节点的应答
- props.put("acks", "all");
- // 消息发送最大尝试次数
- props.put("retries", 0);
- // 一批消息处理大小
- props.put("batch.size", 16384);
- // 增加服务端请求延时
- props.put("linger.ms", 1);
- // 发送缓存区内存大小 32m
- props.put("buffer.memory", 33554432);
- // key序列化
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // value序列化
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- return new KafkaProducer<>(props);
- }
-
-
- /**
- * 发送消息
- */
- public static void kafkaSendMsg(String topic, String key, Object value) {
- KafkaProducer<String, Object> producer = KafkaConnectUtils.createProducer();
- ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, value);
- producer.send(record, (recordMetadata, ex) -> {
- if (recordMetadata == null) {
- LOGGER.info("kafkaSendMsg->发送消息失败:" + ex.toString());
- } else {
- LOGGER.info("kafkaSendMsg->发送消息成功:topic=" + recordMetadata.topic()
- + ",key=" +recordMetadata.serializedKeySize()+",value="+recordMetadata.serializedValueSize());
- }
- });
- producer.close();
- }
-
- /**
- * 创建消费者
- *
- * @param
- * @return
- */
- public static KafkaConsumer<String, Object> createKafkaConsumer() {
- Properties props = new Properties();
- // 定义kakfa 服务的地址,不需要将所有broker指定上
- props.put("bootstrap.servers", "master:9092");
- //制定consumer group
- props.put("group.id", "AppGroup");
- // 是否自动确认offset
- props.put("enable.auto.commit", "true");
- // 自动确认offset的时间间隔
- props.put("auto.commit.interval.ms", "1000");
- // key的序列化类
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // value的序列化类
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- return new KafkaConsumer<>(props);
- }
-
- /**
- * 拉取消费信息
- */
- public static void pullKafkaMsg(List<String> topics) {
- KafkaConsumer<String, Object> kafkaConsumer = createKafkaConsumer();
- ///订阅多个主题topic
- kafkaConsumer.subscribe(topics);
- ConsumerRecords<String, Object> consumerRecord = kafkaConsumer.poll(Duration.ofSeconds(10));
- for (ConsumerRecord<String, Object> next : consumerRecord) {
- LOGGER.info("消息消费{}topic=" + next.topic() + ",key=" + next.key()+",value="+next.value());
- }
- }
-
- }
3、测试调用
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class AppTest {
-
- @Test
- public void kafkaSendMsg(){
- KafkaConnectUtils.kafkaSendMsg("testdemo","liucui","刘翠");
- KafkaConnectUtils.kafkaSendMsg("testdemo","liuping","刘萍");
- }
-
- @Test
- public void kafkaPullMsg(){
- List<String> list=new ArrayList<>();
- list.add("testdemo");
- KafkaConnectUtils.pullKafkaMsg(list);
- }
-
- }
三、关于kafka的角色及特性
1、zookeeper:用于协调broker集群的,保存broker的元数据(与rokcetmq的nameserver作用类 似,用于存储broker的信息),选举broker的controller.
2、 broker : mq队列存储的服务实体
3、topic: 主题是mq在broker上的逻辑分区
4、partition物理分区,一个topic可以分成若干个分区,一般与broker集群的个数一致partition使用主从复制,leader负责读写数据,follower并作为备份副本repilca, partition的leader节点选举是从ISR集中产生的,不是通过zookeeper实现的
5、AR 、ISR 、OSR 用于描述主从节点复制状态的
(1) AR: all replicas (所有副本)
(2)ISR: in sync replicas (与leader节点同步的副本集)
(3)OSR: out sync replicas (与leader节点同步不一致的副本集合)
6、生产者写消息的ACK机制
(1) 0 不管消息写入内存落盘是否成功直接返回发送成功
(2)1 leader写入消息落盘成功时返回成功
(3)-1 leader和isr都写入成功后返回
7、 消费组再均衡Rebalance
为了保证大体上partition和consumer的均衡性,提高topic的并发消费能力,就有了Rebalance的协议,规定ConsumerGroup下的所有consumer如何达成一致,来分配topic下的分区。Rebalance触发的条件:
(1)有新的Consumer加入
(2)有consumer挂了
(3)有新的topic增加
(4)有consumer取消对topic的订阅
8、顺序写和零拷贝
(1)磁盘顺序写
磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大 量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断 追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了 显著提升。
(2)零拷贝
网络编程传统的文件传输模型及问题
首先要明确一个概念上下文切换,所谓的上上下文切换是从用户态切换至内核态,等待内核完成任务后再从内核态切换回用户态。 从上图可以看出,一次文件拷贝发生了四次用户态和内核态的上下文切换,以及四次数据拷贝,也就是在这个地方产生了大量不必要的损耗。可以很清楚地看到数据拷贝地过程, 第一次拷贝将磁盘中的数据拷贝到内核的缓冲区中,第二次拷贝内核将数据处理完,接着拷贝到用户缓冲区中,第三次拷贝此时需要通过socket将数据发送出去,将用户缓冲区中的数据拷贝至内核中socket的缓冲区中, 第四次拷贝:把内核中socket缓冲区的数据拷贝到网卡的缓冲区中,通过网卡将数据发送出去。所以要想优化传输性能,就要从减少数据拷贝和用户态内核态的上下文切换下手,这也就是零拷贝技术的由来。
零拷贝就是一种避免CPU将数据从一块存储拷贝到另外一块存储的技术。linux操作系统“零拷贝”机制使用了 sendfile方法,允许操作系统将数据从 Page Cache直接发送到网络,只需要最后一步的copy操作将数据复制到NIC缓冲 区,这样避免重新复制数据.
Linux系统中常用的零拷贝技术有sendfile、mamp等
sendfile的作用是直接在两个文件描述符之间传递数据。由于整个操作完全在内核中(直接从内核缓冲区拷贝到socket缓冲区),从而避免了内核缓冲区和用户缓冲区之间的数据拷贝,是专门为了在网络上传输文件而实现的函数。
mmap用于申请一段内存空间,也就是我们在进程间通信中提到过的共享内存,通过将内核缓冲区的数据映射到用户空间中,两者通过共享缓冲区直接访问统一资源,此时内核与用户空间就不需要再进行任何的数据拷贝操作.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。