赞
踩
消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:
// 1. 创建一个保存字符串的队列
Queue<String> stringQueue = new LinkedList<String>();
// 2. 往消息队列中放入消息
stringQueue.offer("hello");
// 3. 从消息队列中取出消息并打印
System.out.println(stringQueue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。
我们可以简单理解消息队列就是将需要传输的数据存放在队列中。
RabbitMQ
RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。
优点:
轻量级,快速,部署使用方便
支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
RabbitMQ的客户端支持大多数的编程语言,支持AMQP协议。
缺点:
如果有大量消息堆积在队列中,性能会急剧下降
每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。
RocketMQ
借鉴了Kafka的设计并做了很多改进,几乎具备了消息队列应该具备的所有特性和功能。
优点:
RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。
经过了历次的双11考验,性能,稳定性可靠性没的说。
java开发,阅读源代码、扩展、二次开发很方便。
对电商领域的响应延迟做了很多优化。
每秒处理几十万的消息,同时响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。
性能比RabbitMQ高一个数量级。
支持死信队列,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
缺点:
Kafka
高可用,几乎所有相关的开源软件都支持,满足大多数的应用场景,尤其是大数据和流计算领域,
Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。
对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。
每秒处理几十万异步消息消息,如果开启了压缩,最终可以达到每秒处理2000w消息的级别。
但是由于是异步的和批处理的,延迟也会高,不适合电商场景。
Kafka的架构师jay kreps非常喜欢franz kafka(弗兰兹·卡夫卡),并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。
大型电商网站(淘宝、京东、国美、苏宁…)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。
我们之前学习过Java的服务器开发,Java服务器端开发的交互模型是这样的:
我们之前也学习过使用Java JDBC来访问操作MySQL数据库,它的交互模型是这样的:
它也是一种请求响应模型,只不过它不再是基于http协议,而是基于MySQL数据库的通信协议。而如果我们基于消息队列来编程,此时的交互模式成为:生产者、消费者模型。
消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
点对点模式特点:
发布/订阅模式特点:
Kafka官网:http://kafka.apache.org
Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
我们重点关键三个部分的关键词:
特性
高吞吐量:单机每秒处理几十上百万的消息量。即使存储了TB及消息,也保持稳定的性能。
零拷贝 减少内核态到用户态的拷贝,磁盘通过sendfile实现DMA 拷贝Socket buffer
顺序读写 充分利用磁盘顺序读写的超高性能
页缓存mmap,将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。
高性能:单节点支持上千个客户端,并保证零停机和零数据丢失。
持久化:将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。
分布式系统,易扩展。所有的组件均为分布式的,无需停机即可扩展机器。
可靠性 - Kafka是分布式,分区,复制和容错的。
客户端状态维护:消息被处理的状态是在Consumer端维护,当失败时能自动平衡。
技术优势
可伸缩性:Kafka 的两个重要特性造就了它的可伸缩性。
容错性和可靠性:Kafka 的设计方式使某个代理的故障能够被集群中的其他代理检测到。由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行。
吞吐量:代理能够以超快的速度有效地存储和检索数据。
应用场景
日志收集:用Kafka可以收集各种服务的Log,通过大数据平台进行处理;
消息系统:解耦生产者和消费者、缓存消息等;
用户活动跟踪:Kafka经常被用来记录Web用户或者App用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到Kafka的Topic中,然后消费者通过订阅这些Topic来做运营数据的实时的监控分析,也可保存到数据库;
我们通常将Apache Kafka用在两类程序:
上图,我们可以看到:
kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发。
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最高) |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | 不支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
在大数据技术领域,一些重要的组件、框架都支持Apache Kafka,不论成成熟度、社区、性能、可靠性,Kafka都是非常有竞争力的一款产品。
Apache Kafka这么多年的发展,目前也有一个较庞大的生态圈。
Kafka生态圈官网地址:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
http://kafka.apache.org/downloads 可以查看到每个版本的发布时间。
首先需要安装Java环境,同时配置环境变量,步骤如下:
在/etc/profile文件中配置如下变量
export JAVA_HOME=/java/jdk-12.0.1
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=.:$JAVA_HOME/bin:$JRE_HOME/bin:$KE_HOME/bin:${
MAVEN_HOME}/bin:$PATH
java -version
Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容。
同样在官网下载安装包到指定目录解压缩,步骤如下:
# The number of milliseconds of each tick # zk服务器的心跳时间 tickTime=2000 # The number of ticks that the initial # synchronization phase can take # 投票选举新Leader的初始化时间 initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement # do not use /tmp for storage, /tmp here is just # example sakes. # 数据目录 dataDir=temp/zookeeper/data # 日志目录 dataLogDir=temp/zookeeper/log # the port at which the clients will connect # Zookeeper对外服务端口,保持默认 clientPort=2181
angyan@Server-node:/mnt/d/zookeeper-3.4.14$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /mnt/d/zookeeper-3.4.14/bin/../conf/zoo.cfg
//启动成功
Starting zookeeper ... STARTED
angyan@Server-node:/mnt/d/zookeeper-3.4.14$
cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1.angyan.cn:2181,node2.angyan.cn:2181,node3.angyan.cn:2181
cd /export/server
scp -r kafka_2.12-2.4.1/ node2.angyan.cn:$PWD
scp -r kafka_2.12-2.4.1/ node3.angyan.cn:$PWD
修改另外两个节点的broker.id分别为1和2
---------node2.angyan.cn--------------
cd /export/server/kafka_2.12-2.4.1/config
vim erver.properties
broker.id=1
--------node3.angyan.cn--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
vim /etc/profile
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${
KAFKA_HOME}
分发到各个节点
scp /etc/profile node2.angyan.cn:$PWD
scp /etc/profile node3.angyan.cn:$PWD
每个节点加载环境变量
source /etc/profile
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1.angyan.cn:9092 --list
注意:
目录名称 | 说明 |
---|---|
bin | Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 |
config | Kafka的所有配置文件 |
libs | 运行Kafka所需要的所有JAR包 |
logs | Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 |
site-docs | Kafka的网站帮助文件 |
为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
cd /export/onekey
node1.angyan.cn
node2.angyan.cn
node3.angyan.cn
vim start-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done
vim stop-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
}&
wait
done
chmod u+x start-kafka.sh
chmod u+x stop-kafka.sh
./start-kafka.sh
./stop-kafka.sh
命令如下:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic angyan --partitions 2 --replication-factor 1
–zookeeper:指定了Kafka所连接的Zookeeper服务地址。多个zookeeper用 ‘,’分开。
–topic:指定了所要创建主题的名称
–partitions:指定了分区个数
–replication-factor:指定了副本因子。每个副本分布在不通节点,不能超过总节点数。如你只有一个节点,但是创建时指定副本数为2,就会报错。
–create:创建主题的动作指令
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic angyan --partitions 2 --replication-factor 1
//主题创建成功
Created topic angyan.
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$
命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --list
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
angyan
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$
命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic angyan
–describe 查看详情动作指令
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic angyan
Topic: angyan PartitionCount:2 ReplicationFactor:1 Configs:
Topic: angyan Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: angyan Partition: 1 Leader: 0 Replicas: 0 Isr: 0
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$
命令:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic angyan
–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic angyan
Hello,Kafka!
命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic angyan
–broker-list 指定了连接的Kafka集群的地址
–topic 指定了发送消息时的主题
angyan@Server-node:/mnt/d/kafka_2.12-2.2.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic angyan
>Hello,Kafka!
>
创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。
# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1.angyan.cn:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1.angyan.cn:9092
使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。
bin/kafka-console-producer.sh --broker-list node1.angyan.cn:9092 --topic test
使用下面的命令来消费 test 主题中的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1.angyan.cn:9092 --topic test --from-beginning
基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。我们可以通过基准测试,了解到软件、硬件的性能水平。主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。
测试步骤:
bin/kafka-topics.sh --zookeeper node1.angyan.cn:2181 --create --topic benchmark --partitions 1 --replication-factor 1
在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。为了方便测试,课程上演示测试500W的消息作为基准测试。
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1
bin/kafka-producer-perf-test.sh
--topic topic的名字
--num-records 总共指定生产数据量(默认5000W)
--throughput 指定吞吐量——限流(-1不指定)
--record-size record数据大小(字节)
--producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式
bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic benchmark --fetch-size 1048576 --messages 5000000
bin/kafka-consumer-perf-test.sh
--broker-list 指定kafka集群地址
--topic 指定topic的名称
--fetch-size 每次拉取的数据大小
--messages 总共要消费的消息个数
接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。
<repositories><!-- 代码库 --> <repository> <id>central</id> <url>http://maven.aliyun.com/nexus/content/groups/public//</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> <updatePolicy>always</updatePolicy> <checksumPolicy>fail</checksumPolicy> </snapshots> </repository> </repositories> <dependencies> <!-- kafka客户端工具 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。