赞
踩
1. 当服务器在一次性处理大量的数据时,可能会造成服务器瘫痪,或者数据丢失这种情况
2. 如果将要处理的数据先维护在一个缓存系统中,然后再慢慢的处理,这样就可以避免上述情况。
3. 缓存系统就是用来存储消息的,该系统中至少要维护一个用来存储消息的先后顺序的队列(数据结构)
4. 为什么要使用队列(Queue,Deque,LinkedList),而不是其他的数据结构(Arraylist),因为消息要进行频繁的生产和消费(增删操作)
消息,就是指的网络中传输的数据,比如行为日志,文本、视频,音频、图片
队列,用来存储消息的容器,该容器是一个首尾相接的环形队列,规定的是FIFO
消息队列就是两者的结合,以及提供了各种API,和底层优化设计的应用程序(框架)
主要分为两大类,一类是点对点模式,一类是发布/订阅模式
1)点对点模式
1. 可以叫 peer-to-peer ,也可以叫point-to-point
2. 角色分为:消息队列(Queue)、发送者(Sender)、接收者(Receiver)
3. 发送者发送消息到队列中,该消息只能被一个接收者所接受,即使有多个接收者同时侦听到了这一条消息。
4. 该消息一旦被消费,则不存储在消息队列中,比如打电话
5. 支持异步/同步操作
2)发布/订阅模式
1. 叫 pub/sub模式
2. 角色分为:
-- 消息队列(Queue),
-- 发布者(Publisher、也叫producer)、
-- 订阅者(Subscriber,也叫consumer),
-- 主题(Topic) 用来将消息进行逻辑分类的
3. 一个消息可以被多个消费者消费,互不影响,比如我发布一个微博:关注我的人都能够看到。
4. 消费者在消费数据时,
--可以是push模式(消息队列主动将信息push给消费者),
--也可以使pull模式(消费者主动拉取消息对列中的消息),该模式的优点,消费者可以在自己处理消息的能力范围内,进行消费数据,
5. 支持异步/同步操作
1. 解耦: 消息系统可以作为中间件, 下游的软件和上游的软件无需了解彼此
2. 冗余: 消息可以持久化到磁盘上,或者做备份处理,避免数据丢失
3. 扩展: 消息队列提供了统计的生产者/消费者接口,
任何软件都可以调用生产者接口API,作为生产者
软件软件都可以调用消费者接口API,作为消费者
4. 销(削)峰能力:避免流量高峰期造成的系统瘫痪, 消息队列可以缓存这一时期的数据,慢慢处理
5. 可靠性: 消息队列中部分数据丢失,是有副本策略的,可以恢复数据
RabbitMQ
Redis : 本身是一个KV形式的NoSql数据,但是也有消息队列功能
ZeroMQ
ActiveMQ JMS
Kafka/Jafka : 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
MetaQ/RocketMQ
apache官网:https://kafka.apache.org/documentation/#quickstart
中文官网:https://kafka.apachecn.org/
1. 是一个分布式、用于处理消息的发布/订阅消息系统
2. 是用scala语言编写的(scala编写另外一门比较火的框架是spark)
3. 具有以下特点:
-- 高吞吐量: 可以满足每秒百万级别消息的生产和消费——生产消费。
-- 持久化(保存在磁盘上一定时间,默认7天,区别于永久性)
-- 分布式:基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体
-- 健壮性:稳定,API接口的通用
4. 同时适应在线流处理和离线批处理
--borker: kafka集群中各个节点的名称, 值得注意的是,每个broker都有一个唯一标识符,broker间的标识符不能重复。
--producer: 就是生产消息到Kafka集群的应用程序
--consumer: 就是从Kafka集群中消费消息的应用程序
--consumer-group:为了方便管理消费者,比如设置消费者的一些配置属性,引入了消费者组的概念,进而统一设置。
--zookeeper:作为协调Kafka集群工作的角色,比如多个broker谁是controller,broker的动态上下线。分区的副本冗余策略,leader的选举
--message: 生产者产生的、消费者消费的数据,就是message,也可以称之为event。
--topic: 用于将message进行逻辑划分,即划分成不同的主题。比如有一些消息是关于美食的,一些是关于旅游的,一些是关于宠物等
--partition:是kafka集群中真正缓存数据的地方
1)本质是目录。
2)并发消费:每个主题可以有多个分区(分区多,消费者就可以并发处理消息)
3)可靠性:每个分区可以有多个副本,同一个分区的多个副本不能在同一个节点上。
--leader: 同一个分区的多个副本中,要有一个leader角色。
1)生产者向leader生产消息,
2)消费者从leader上消费消息,
3)follower从leader上同步消息
--follower: 同一个分区的多个副本中,除了leader角色,剩下的都是follower角色
作用:就是提高消息的安全可靠性,副本冗余
步骤1)上传、解压、更名、配置环境变量
[root@qianfeng01 ~]# tar -zxvf kafka_2.11-1.1.1.tgz -C /usr/local/
[root@qianfeng01 ~]# cd /usr/local/
[root@qianfeng01 ~]# mv kafka_2.11-1.1.1/ kafka
[root@qianfeng01 ~]# vim /etc/profile
.....省略....
#kafka environment
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@qianfeng01 ~]# source /etc/profile
步骤2)修改server.properties
[root@qianfeng01 kafka]# cd config [root@qianfeng01 config]# vim server.properties # 每个节点的唯一标识符的配置 broker.id=0 # 设置消息的存储位置,如果有多个目录,可以用逗号隔开 log.dirs=/usr/local/kafka/data # 设置zookeeper的集群地址,同时指定kafka在zookeeper上的各个节点的父znode zookeeper.connect=qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka # 下面属性可以改可不改 # 发送消息的缓存大小,100K socket.send.buffer.bytes=102400 # 接收消息的缓存大小,100K socket.receive.buffer.bytes=102400 # 服务端处理发送过来的数据的最大字节数 100M socket.request.max.bytes=104857600 # 消息对应的文件保留的时间,默认使7天 log.retention.hours=168 # 消息对应的文件的最大字节数,1G log.segment.bytes=1073741824 # 用来检查消息对应的文件是否过期或者是大于1G的时间周期,默认是300秒一检查 log.retention.check.interval.ms=300000
步骤3)同步到其他节点上
[root@qianfeng01 local]# scp -r kafka/ qianfeng02:/usr/local/
[root@qianfeng01 local]# scp -r kafka/ qianfeng03:/usr/local/
[root@qianfeng01 local]# scp /etc/profile qianfeng02:/etc/
[root@qianfeng01 local]# scp /etc/profile qianfeng03:/etc/
步骤4)修改其他节点上的brokerId
qianfeng02的broker.id 为 1
qianfeng03的broker.id 为 2
步骤1)先启动zookeeper
可以使用myzkServer.sh脚本,同时启动三台机器的zookeeper
步骤2)启动三台机器的kafka
方式1: 正常启动, 注意,带上配置文件,进行后台启动
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
关闭:
kafka-server-stop.sh -daemon /usr/local/kafka/config/server.properties
方式2: 自己编写一个启动三台机器kafka的脚本
比如 mykafka.sh
kafka在zookeeper上维护了多个znode节点,分别用于存储不同的信息
--cluster/id : 用于存储kafka集群的唯一标识 { "version":"1","id":"mO77ods8Q-ek99Y7lTOwUg"} --controller : 用于记录多个broker中谁是控制角色 { "version":1,"brokerid":0,"timestamp":"1644464244261"} --controller_epoch : 记录的是第几次选举controller角色 3 --brokers/ids: 以子znode的形式记录所有的broker唯一标识符 brokers/ids/0 : 记录着自己的信息 { ..."endpoints":["PLAINTEXT://qianfeng01:9092"],"host":"qianfeng01","port":9092} brokers/ids/1 : 记录着自己的信息 { ..."endpoints":["PLAINTEXT://qianfeng02:9092"],"host":"qianfeng02","port":9092} brokers/ids/2 : 记录着自己的信息 { ..."endpoints":["PLAINTEXT://qianfeng03:9092"],"host":"qianfeng03","port":9092} --brokers/topics : 以子znode的形式记录kafka集群中的所有主题名 --consumers : 旧版本用来记录消费者消费消息的偏移量,以便下次继续消费,但是新版本不用该znode, 而是以一个主题"__consumer_offsets"来记录各个消费者的偏移量
对应的脚本是:kafka-topics.sh, 用法直接输入脚本名称回车
[root@qianfeng01 data]# kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter 修改一个主题的分区数量,副本,以及配置等 --config <String: name=value> 修改主题的一个配置。 --create 创建一个新的主题 --delete 删除一个主题 --delete-config <String: name> 移除一个配置 --describe 列出指定的主题的详情信息 --disable-rack-aware 禁用机架感知副本分配 --force 抑制控制台提示 --help 打印帮助信息 --if-exists 如果在更改或删除主题时设置该操作,则该操作只会在主题存在时执行 --if-not-exists 如果在创建主题时设置,则只在主题不存在时执行该操作 --list 列出所有的主题名称 --partitions <number> `必需属性`,创建或者修改时的分区数量 --replica-assignment <.....> 正在创建或更改的主题的手动分区到代理分配列表。 --replication-factor <Integer> `必需属性`,创建主题时的副本因子 --topic <String: topic> 要创建,修改或者描述的主题名称 --topics-with-overrides 如果在描述主题时设置,则只显示覆盖了配置的主题 --unavailable-partitions 如果在描述主题时设置,只显示leader不可用的分区 --under-replicated-partitions 如果在描述主题时设置,则只显示除了leader的副本 --zookeeper <String: hosts> `必需属性`,用于zookeeper连接的连接字符串,格式为host:port。可以指定多个主机以允许故障转移。
案例1:
[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--create \
--topic food \
--partitions 4 \
--replication-factor 2
案例2:
[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--create \
--topic pet \
--partitions 2 \
--replication-factor 3
小贴士:
1) 副本因子不能大于broker的数量, 否则报以下错误:
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException Replication factor: 4 larger than available brokers: 3
2) zookeeper的路径,必需写到server.properties里指定的kafka的根znode.
3) 创建时,必需指定分区数量,副本因子,zookeeper路径
[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--list
原理:
列出所有主题名称的逻辑:其实就是访问zookeeper的/kafka/brokers/topics/ 子节点的名字
[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--describe \
--topic food
获取的描述信息如下:
Topic:food PartitionCount:4 ReplicationFactor:2 Configs: Topic: food Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: food Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: food Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: food Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2 可以获取四个信息: Topic: 主题名 PartitionCount: 该主题的分区数量 ReplicationFactor: 该主题的每个分区的副本因子 Configs: 列出的是每个分区的详情信息 摘抄一条解析分区的详情信息: Topic: food Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 ^ ^ ^ ^ ^ | | | | | 主题名称 分区号码 该分区所有副本的leader的所在broker 副本所在的broker 当前可用的副本的broker位置
[root@qianfeng01 ~]# kafka-topics.sh \
--zookeeper qianfeng01,qianfeng02,qianfeng03/kafka \
--alter \
--topic food \
--partitions 5
小贴士
1. 修改主题的分区时,数量只能增加,不能减少
2. 副本因子不能被修改
[root@qianfeng01 ~]# kafka-topics.sh \
--zookeeper qianfeng01,qianfeng02,qianfeng03/kafka \
--delete \
--topic pet
删除的原理:
1. 将zookeeper里对应的znode删除
2. 将kafka的存储目录下的该主题的分区目录打标记,过一会,再删除.
使用 kafka-console-producer.sh脚本,回车,即可显示帮助信息。
[root@qianfeng01 data]# kafka-console-producer.sh Read data from standard input and publish it to Kafka. Option Description ------ ----------- --batch-size <Integer: size> 如果没有同步发送消息,则在单个批处理中发送的消息数。(默认:200) --broker-list <String: broker-list> `REQUIRED`: 形式为HOST1:PORT1,HOST2:PORT2的代理列表字符串。 --compression-codec [compression-codec] 'none','gzip', 'snappy', or 'lz4'. 默认值为'gzip' --key-serializer <className> 用于序列化key的消息编码器实现的类名。(默认:kafka.serializer.DefaultEncoder) --line-reader <String: reader_class> 用于从标准中读取行的类的类名。默认情况下,每行读取为单独的消息。 (default: kafka.tools. ConsoleProducer$LineMessageReader) --max-block-ms <Long> 生产者在发送请求期间阻塞的最大时间 (default: 60000) --max-memory-bytes <Long> 生产者用来缓冲等待发送到服务器的记录的总内存。(default: 33554432) --max-partition-memory-bytes <Long> 分配给分区的缓冲区大小。当接收到小于这个大小的记录时, 生产者将尝试乐观地将它们组合在一起,直到达到这个大小。 (default: 16384) --message-send-max-retries <Integer> 重试次数(default: 3) --producer-property <String> 将用户定义的属性以key=value形式传递给生产者的机制。 --producer.config <String: config file> 引用生产者的配置文件 --property <String: prop> 自定义属性 --queue-enqueuetimeout-ms <Integer: 消息排队超时时间 (default:2147483647) --queue-size <Integer: queue_size> 如果设置并且生产者以异步模式运行,这将给出等待足够批处理 大小的消息队列的最大数量。(default: 10000) --request-required-acks <String: 生产者请求所需的ack (default: 1) --request-timeout-ms <Integer: request 生产者请求的ack超时。值必须是非负且非零 (default: 1500) --retry-backoff-ms <Integer> 在每次重试之前,生产者会刷新相关主题的元数据。由于leader选举需要一些时间, 这个属性指定生产者在刷新元数据之前等待的时间。 (default: 100) --socket-buffer-size <Integer: size> tcp协议的缓存大小。(default: 102400) --sync 如果设置消息,发送到代理的请求是同步的,每次一个。 --timeout <Integer: timeout_ms> 如果设置并且生产者在异步模式下运行,这将给出消息队列等待足够大的批处理的最大时间。 该值的单位是ms。(default: 1000) --topic <String: topic> `REQUIRED`: 生产的消息去往的主题名称. --value-serializer <String: kafka的消息对应的value的序列化实现类 (default: kafka. serializer.DefaultEncoder)
该脚本的作用是读取控制台的数据,发送到kafka集群中
[root@qianfeng01 data]# kafka-console-producer.sh \
--broker-list qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 \
--topic food
使用 kafka-console-consumer.sh脚本,回车,即可显示帮助信息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。