赞
踩
Zookeeper从设计模式角度来理解
当ZooKeeper 集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
服务器初始化启动。
服务器运行期间无法和Leader保持连接。
而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
集群中确实不存在Leader。
选举Leader规则
服务器主机名 | ip地址 | 安装服务 |
---|---|---|
node1 | 192.168.10.11 | Zookeeper、kafka |
node2 | 192.168.10.12 | Zookeeper、kafka |
node3 | 192.168.10.13 | Zookeeper、kafka |
systemctl stop firewalld
setenforce 0
#关闭防火墙和核心防护
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
#安装jdk
java -version
#查看版本信息
cd /opt
#切换目录
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
#下载安装包
#或者上传apache-zookeeper-3.5.7-bin.tar.gz安装包
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
#解压
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
#移动到指定目录
cd /usr/local/zookeeper-3.5.7/conf/ #切换目录 cp zoo_sample.cfg zoo.cfg #备份配置文件 #编辑配置文件 vim zoo.cfg tickTime=2000 #2行,通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 initLimit=10 #5行,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s syncLimit=5 #8行,Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer dataDir=/usr/local/zookeeper-3.5.7/data #12行,修改,指定保存Zookeeper中的数据的目录,目录需要单独创建 dataLogDir=/usr/local/zookeeper-3.5.7/logs #13行,添加,指定存放日志的目录,目录需要单独创建 clientPort=2181 #15行,客户端连接端口 server.1=192.168.10.11:3188:3288 server.2=192.168.10.12:3188:3288 server.3=192.168.10.13:3188:3288 #末行添加集群信息
server.A=B:C:D
mkdir /usr/local/zookeeper-3.5.7/data
#创建数据目录
mkdir /usr/local/zookeeper-3.5.7/logs
#创建日志目录
echo 1 > /usr/local/zookeeper-3.5.7/data/myid
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
echo 3 > /usr/local/zookeeper-3.5.7/data/myid
#编辑启动脚本 vim /etc/init.d/zookeeper #!/bin/bash #chkconfig:2345 20 90 #运行级别,从0-6 #20表示启动优先级的选项 #90表示停止优先级的选项 #description:Zookeeper Service Control Script ZK_HOME='/usr/local/zookeeper-3.5.7' case $1 in start) echo "---------- zookeeper 启动 ------------" $ZK_HOME/bin/zkServer.sh start ;; stop) echo "---------- zookeeper 停止 ------------" $ZK_HOME/bin/zkServer.sh stop ;; restart) echo "---------- zookeeper 重启 ------------" $ZK_HOME/bin/zkServer.sh restart ;; status) echo "---------- zookeeper 状态 ------------" $ZK_HOME/bin/zkServer.sh status ;; *) echo "Usage: $0 {start|stop|restart|status}" esac
chmod +x /etc/init.d/zookeeper
#添加权限
chkconfig --add zookeeper
#开机自启
service zookeeper start
#启动
service zookeeper status
#查看状态
主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。
我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。
当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
消费者组,由多个 consumer 组成。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名,若不指定组名则属于默认的组。
将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,防止数据被重复读取。
消费者组之间互不影响。
服务器主机名 | ip地址 |
---|---|
node1 | 192.168.10.11 |
node2 | 192.168.10.12 |
node3 | 192.168.10.13 |
systemctl stop firewalld
setenforce 0
#关闭防火墙和核心防护
cd /opt
#切换目录
#上传kafka_2.13-2.7.1.tgz安装包
tar zxvf kafka_2.13-2.7.1.tgz
#解压
mv kafka_2.13-2.7.1 /usr/local/kafka
#移动目录
cd /usr/local/kafka/config/ #切换目录 cp server.properties{,.bak} #备份配置文件 #修改配置文件 vim server.properties broker.id=0 #21行,修改id号,broker的全局唯一编号,每个broker不能重复,因此要在其他node上配置不同的broker.id,node2为1,node3为2 listeners=PLAINTEXT://192.168.10.11:9092 #31行,修改指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改 num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改 num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数 socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小 socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小 log.dirs=/usr/local/kafka/logs #60行,修改kafka运行日志存放的路径,也是数据存放的路径 num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖 num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量 log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除 log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件 zookeeper.connect=192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181 #123行,配置连接Zookeeper集群地址
#修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#末行添加配置内容
source /etc/profile
#执行
#编辑启动脚本 vim /etc/init.d/kafka #!/bin/bash #chkconfig:2345 22 88 #description:Kafka Service Control Script KAFKA_HOME='/usr/local/kafka' case $1 in start) echo "---------- Kafka 启动 ------------" ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties ;; stop) echo "---------- Kafka 停止 ------------" ${KAFKA_HOME}/bin/kafka-server-stop.sh ;; restart) $0 stop $0 start ;; status) echo "---------- Kafka 状态 ------------" count=$(ps -ef | grep kafka | egrep -cv "grep|$$") if [ "$count" -eq 0 ];then echo "kafka is not running" else echo "kafka is running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" esac scp /etc/init.d/kafka 192.168.10.12:/etc/init.d/ #将脚本文件拷贝到192.168.10.12 scp /etc/init.d/kafka 192.168.10.13:/etc/init.d/ #将脚本文件拷贝到192.168.10.13
chmod +x /etc/init.d/kafka
#添加权限
chkconfig --add kafka
#开机自启
service kafka start
#启动
service kafka status
#查看状态
netstat -natp | grep 9092
#查看进程
kafka-topics.sh --create --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181 --replication-factor 2 --partitions 3 --topic xxxx
#--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
#--replication-factor:定义分区副本数,1 代表单副本,建议为 2
#--partitions:定义分区数(不定义将自动使用配置文件中的设置)
#--topic:定义 topic 名称
kafka-topics.sh --list --zookeeper 192.168.10.11:2181,192.168.10.12:2181,192.168.10.13:2181
#查看当前topic列表
kafka-topics.sh --list --zookeeper 192.168.10.12:2181
#查看当前topic列表
#可以指定三个地址或者任意一个地址查看都可
kafka-topics.sh --describe --zookeeper 192.168.10.11:2181
#查看topic详细信息
kafka-console-producer.sh --broker-list 192.168.10.11:9092 --topic test
#发布消息
kafka-console-consumer.sh --bootstrap-server 192.168.10.12:9092 --topic test --from-beginning
#消费消息
#--from-beginning:会把主题中以往所有的数据都读取出来
kafka-topics.sh --zookeeper 192.168.10.12:2181 --alter --topic test --partitions 6
#修改分区数
kafka-topics.sh --delete --zookeeper 192.168.10.12:2181 --topic test
#删除topic
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。
注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
可靠性级别 | 说明 |
---|---|
0 | 这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。 |
1(默认配置) | 这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。 |
-1(或者是all) | producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复 三种机制性能依次递减,数据可靠性依次递增。 |
注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。