赞
踩
(一)
(1):点对点模式(一对一,消费者主动拉取数据,消费收到后消息清除);
优点:速度有消费者决定 (消费数据有消费者主动拉取);
缺点:需要消费者实时监控队列是否有数据,因为是主动拉取;
(2):发布/订阅模式(一对多,数据生产后,推送给所以的订阅者);
优点:发布订阅模式,由队列推送消息给订阅者,不需要消费者实时监控队列。
(二)消息队列优点
(1):解耦(客户端A跟客户端B不会直接连接,而是跟过一个中间件进行交互。
(2):冗余(数据保存和备份的能力。)
(3):扩展性(可以集群,可扩展)
(4):灵活性&峰值处理能力(可拓展,可集群,并发高)
(5):可恢复性(数据是冗余,有备份,丢了可以恢复的)。
(6):顺序保证(队列是先进先出的)
(7):缓冲(来不及消费的数据可以进行缓存)
(8):异步通讯(客户端A发送数据给客户端B,如果客户端B挂了也是没有关系的,消息队列能够进行缓存,没有被消费)
注意:1:同一个消费者组的消费者不能同时消费同一个分区的数据
2:Follower只做备份用的
讲解:消息队列有两种模式,点对点模式(消费者自己拉取数据)和发布/订阅模式(相当于消息推送,例如微信公众号推送消息),kafka只支持消息队列的点对点模式,拉取数据。 1:producer 生产者(生产消息,把消息发送到topic分类) 2:服务器 broker实例 集群 3:topic 实际存数据的目的地, 4:partition 分区(一个topic默认12个分区)负载均衡用的,如果只有一个服务器,默认的12个分区会在一个服务器上,如果三个服务器,12个分区会分布在三个服务上 注:分区可以在一台服务器上,也可以在多台服务器上。 5:Leader(领导者) Follower(追随者) 启到备份作用,每一个分区都有一个Leader,Follower 备用的(备份当前分区的数据到其他服务器,备份到不同服务器)。如果Leader挂了,Follower会晋升成Leader提供服务。 注意:跟zookeeper不同,kafka只有Leader进行数据处理,Followe只做数据备份作用,除非Leader挂了,Follower晋升成Leader才能启用。 6:Consumer消费者,同一个消费者组的消费者,不能同时消费同一个分区的数据,但是可以消费不同的分区数据,就是当一个消费者在消费一个topic下面某一个分区的数据时,消费者组里面的另一个消费者不能同时消费这个分区的数据,但是可以消费这个topic下面没有被当前消费者组消费者消费的分区数据。(就是防止消费者组消费者重复消费分区数据),不同消费者组可以消费同一个分区数据,分区数量应该与消费者组里面的消费者保持同等的关系或者倍数关系,因为消费者组里面的消费者在消费分区数据时,同组其他消费者不能消费该区数据。 (topic消息被同组消费者消费:单播:消息只被消费一次) (topic消息被不同组消费者消费:广播:消息同时被消费者消费) 7:zookeeper 注册消费的,Consumer消费到哪里了,会把数据存储到zookeeper。 搭建kafka集群 1:自行下载kafka压缩包,放到服务器解压,目录自己定 2:进入kafka解压目录首先新建logs文件 3:进入config配置文件,我们能看到 consumer.properties(消费者配置文件) producer.properties(生产者配置文件) server.properties(当前kafka服务配置文件)。
修改配置文件
1:server.properties修改
在broker集群中,这个id必须是唯一的。
找到这一行打开,表示是否可以删除topic,修改为true,这里默认是(false)不能删除topic的,所以这里需要打开,否则topic不能删除。
这里是kafka当前broker日志目录地址,刚才解压时已经新建里一个logs目录,到当前logs目录下,pwd获取下当前目录,然后复制过来就好了,就修改了当前日志目录。
log.retention.hours=168
表示,当前kafka服务的数据可以存储168个小时(7天),超过7天的数据默认删除,可以根据自己需求修改。
log.segment.bytes=1073741824
表示当前kafka数据超过1G默认删除,也可以默认修改。
这里是配置zookeeper集群(kafka集群必须基于zookeeper集群,是在zookeeper集群基础之上才能实现kafka集群)。
这里是集群地址配置,用逗号隔开。
server.properties文件就修改到这里,然后保存就好了。
脚本命令讲解
2:把修改好kafka解压文件分发到我们不同服务器(意思就是把修改好的文件,复制到我们不同的服务器上,服务器必需是zookeeper集群的服务器),server.properties文件是我们修改好的了,让后我们只需修改borker.id,只要设置成集群里唯一的一个id就行(集群服务这个id不能重复),配置文件其他地方不需要动,logs文件根据你安装目录设置(注意:borker.id 是int类型borker.id=1,请别写成border,.id=‘1’)。
3:查看kafka目录下的bin目录,脚本目录,找到下图两个脚本(启动服务,和关闭服务,kafka集群只能一台一台启动,如果想一起启动,需要自己写脚本控制)。
常用命令:kafka-topics.sh,kafka-console-consumer.sh, kafka-console-producer.sh
kafka-topics.sh:这个命令是控制topic的创建和删除,或者查看topic详细信息。
kafka-console-consumer.sh:控制台的消费者,测试的时候用的,生产不用,没意义。
kafka-console-producer.sh:控制台的生产者,测试的时候用,生产不用,没意义。
分布式kafka集群启动
4:可以启动kafka服务,首先先启动zookeeper集群,因为kafka集群是依赖zookeeper集群的。
(注意:启动zookeeper集群时,起来了不一定成功了,可以查看一下 zookeeper/bin/zkServer.sh, 查看命令 zkServer.sh status,如果看到Leader或者follower,表示启动成功)
5:启动kafka服务
每个kafka服务都要单独启动
启动命令
到kafka目录下 bin/kafka-server-start.sh config/server.properties
(表示启动服务时,按照配置好的配置文件(参数)启动。)
分布式kafka服务同样的方法启动。
这个进程会一直挂着,后面会写,如何把进程挂后台。
启动好后就可以进行一下步骤了
新建topic
bin/kafka-topics.sh --create --zookeeper --hadoop102:2181 --partitions 2 --replication-factor 2 --topic first
以上表示(create 新建 那台zookeeper服务 partitions 2 两个分区,replication-factor 2 两个副本 topic 名字叫first)
bin/kafka-topics.sh --list --zookeeper hadoop102:2181
表示查看创建了多少topic
注意:发送消息,生产者,注意是发送到kafka集群(kafka集群启动时的地址,默认端口号:9092),而消费消息,则是从zookeeper消费(默认端口2181)。
kafka集群搭建完成。。。
如何使生产者不丢失数据,设置ACK应答机制
ACK机制设置成ALL
表示 producer 发送数据到topic leader的时候 会把数据同步到副本,副本同步好数据好后,leader会应答producer 数据保存好,然后才能生产其他数据,不过效率比较低。
当ACK机制设置成0时
表示producer发送数据到topic leader 后 leader直接应答producer 数据写入成功,不管副本是否备份成功。
下图是ACK机制设置成ALL的执行流程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。