赞
踩
消息队列
优势:解耦和异步调用
异步调用的场景:
用户注册(异步):注册信息存到数据库,数据库向消息队列插入发送短信请求,MQ负责发送短信
秒杀系统(消峰):多个用户参与秒杀的请求全部放到MQ,秒杀系统从MQ中取出前几个请求参与秒杀
消息队列的两种模式
点对点(一对一):一个生产者只能对应一个消费者,而且消息不具有持久化,一旦消费,消费者就不会在消费已经消费过的数据了
一对多(发布/订阅模式):具有持久化,一个生产者可以对应多个消费者
kafka 分布式消息系统
kafka的基本架构
broker相当于kafka的一个节点,topic的消费通常是以组为单位,每个follewer都是负责同步leader的数据,topic的每个分区副本都是存在不同的节点上,防止挂掉
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
kafka安装
(1)解压安装包
(2)修改配置文件(server.properties)
broker.id=0 (唯一,每个节点的值需要不同)
delete.topic.enable=true (能否删除topic,默认为false)
log.dirs=/opt/module/kafka_2.11-0.11.0.2/log (日志存放位置,需手动创建logs目录)
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 (zookeeper连接地址)
(3)分发到其他节点,修改brokerId
启动kafka需要先启动zookeeper
kafka启动命令:bin/kafka-server-start.sh -daemon config/server.properties (daemon后台启动,选择配置文件)
kafka命令介绍
显示kafka下的topic(需要指定zookeeper,指定一个zookeeper即可)
bin/kafka-topics.sh --zookeeper hadoop01:2181 --list
创建topic(指定副本数,分区数,zookeeper)(需要指定zookeeper下kafka的目录)
bin/kafka-topics.sh --zookeeper hadoop01:2181/[kafka目录,可选] --create --topic demo --partitions 3 --replication-factor 2
如果出现异常:replication factor: 2 larger than available brokers: 1 代表有一些kafka节点挂了
查看topic信息:
bin/kafka-topics.sh --zookeeper hadoop01:2181 --describe --topic demo
生产者(指定在哪个topic生产数据,–broker-list指定在哪个kafka节点上,多个用“,”分割)
bin/kafka-console-producer.sh --topic demo --broker-list hadoop01:9092
消费者(指定消费哪个topic,----bootstrap-server指定topic所在的节点,多个用“,”分割)
bin/kafka-console-consumer.sh --topic demo --bootstrap-server hadoop01:9092
从该topic上从头消费
bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic demo --from-beginning
修改topic分区数
bin/kafka-topics.sh --zookeeper hadoop01:2181 --alter --partitions 5 --topic demo
Kafka分区的原因
1、提高并发 (可以以partition为单位读写)
2、方便在集群中扩展 (每个partition可以通过调整,适应它所在的机器,整个集群就能适应任意大小的数据)
分区的原则:
需要将producer发送的数据封装成ProducerRecord对象,consumer消费的数据封装成ConsumerRecord对象
数据可靠性的保证
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据
kafka选择第二种方案,有点在于不会造成太多大量数据冗余,网络延迟对kafka的影响并不高
采用第二种方案,因follower迟迟不能与leader进行同步,kafka内部是这么解决的:
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
ack的应答机制:
0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
消费者分区分配的原则:1、轮询 2、range
kafka消费者消费的方式的是pull类型
不支持事务(要么一起成功,要么一起失败),仿AMQP(高级消息队列协议)
动态扩容,通过zookeeper实现
负载均衡
生产者:向broker发布消息的应用程序
消费者:从消息队列中请求消息的客户端程序
broker: 用于接收生产者发送的消息并将消息放入服务器队列中,便于kafka将这些消息的动态添加到磁盘并给
每个消息一个偏移量。broker就是一个应用实例
topic:消息都有topic(主题管理), 一个业务就相当于一个主题
分区: 一个topic中的消息数据按多个分区组织起来,通常当集群的性能不足时,可以通过增加topic分区提高性能
zookeeper应用:分布式配置管理,分布式状态管理,分布式协调管理,分布式锁服务
zookeeper的个数超过服务器个数的一半,就可以正常工作,否则不行(推荐为奇数个,需要JAVA环境)
命令
开启kafka: (zookeeper先开)
bin/kafka-server-start.sh config/server.properties &
创建topic
bin/kafka-topics.sh --create --zookeeper master:2181 --topic first --replication-factor 1 --partitions 1
查询topic
bin/kafka-topics.sh --list --zookeeper master:2181
开启生产者
bin/kafka-console-producer.sh --broker-list master:9092 --topic first
开启消费者
bin/kafka-console-consumer.sh --topic first --zookeeper master:2181 --from-beginning
集群安装:
1、解压kafka的安装包
2、修改config目录下的server.properties中以下几个参数
broker.id 唯一值
delete.topic.enable=true
log.dirs=/home/hyxy/soft/kafka/logs
zookeeper.connect=master:2181,slave1:2181,slave2:2181
3、分发到所有节点,并修改其中的broke.id
4、结束
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。