赞
踩
第一步: wget https://downloads.apache.org/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz 第二步:tar zxvf解压 第三步:cp zoo_sample.cfg zoo.cfg 第四步:修改 zoo.cfg 配置文件,将 dataDir=/tmp/zookeeper 修改为指定的data目录 第五步:启动: # 可以通过 bin/zkServer.sh 来查看都支持哪些参数 2 # 默认加载配置路径conf/zoo.cfg 3 bin/zkServer.sh start conf/zoo.cfg 4 5 # 查看zookeeper状态 6 bin/zkServer.sh status 第六步:启动zookeeper client连接Zookeeper server bin/zkCli.sh 2 # 连接远程的zookeeper server 3 bin/zkCli.sh ‐server ip:port 第七步:TTL节点: 带过期时间节点,默认禁用,需要在zoo.cfg中添加 extendedTypesEnabled=true 开启。 注意:ttl不能用于临时节点
注册中心 数据发布/订阅(常用于实现配置中心) 负载均衡 命名服务 分布式协调/通知 集群管理 Master选举 分布式锁 分布式队列
第一步:准备三台虚拟机: 192.168.1.60 192.168.1.61 192.168.1.62 第二步: #三台虚拟机 zoo.cfg 文件末尾添加配置 server.1=192.168.1.60:2888:3888 server.2=192.168.1.61:2888:3888 server.3=192.168.1.62:2888:3888 server.A=B:C:D A 是一个数字,表示这个是第几号服务器; 集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据 就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据 与 zoo.cfg 里面的配置信息比较从而判断到底是哪个server。 B 是这个服务器的地址; C 是这个服务器Follower与集群中的Leader服务器交换信息的端口; D 是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而 这个端口就是用来执行选举时服务器相互通信的端口。 第四步:创建 myid 文件,配置服务器编号 在dataDir对应目录下创建 myid 文件,内容为对应ip的zookeeper服务器编号 第五步:启动前需要关闭防火墙(生产环境需要打开对应端口) # 分别启动三个节点的zookeeper server 2 bin/zkServer.sh start 3 # 查看集群状态 4 bin/zkServer.sh status
ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。 会话超时之后没有实现重连机制。 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知 道应该如何处理这些抛出的异常。 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据 处理接口。 创建节点时如果抛出异常,需要自行检查节点是否存在。 无法实现级联删除
存在问题对比 基于数据库实现分布式锁存在什么问题? 并发量受限制 zookeeper分布式锁 优点:ZooKeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题,使用起来也较为简单。 缺点:因为需要频繁的创建和删除节点,性能上不如Redis。在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式 锁。
第一步:安装erlang,rpm -ivh erlang-23.2.7-1.el7.x86_64.rpm 第二步:查看erlang是否安装成功,erl -version 第三步:安装RabbitMQ:在RabiitMQ安装过程中需要依赖socat插件,首先安装该插件 第四步:rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm 第五步:安装rabbit,rpm -Uvh rabbitmq-server-3.9.15-1.el7.noarch.rpm 第六步:安装完成后,可以查看下他的安装情况 第七步:启动RabbitMQ服务,service rabbitmq-server start 第八步:查看服务状态,service rabbitmq-server status 第十步:配置下打开他的Web管理页面,cd /usr/lib/rabbitmq/bin rabbitmq-plugins enable rabbitmq_management({:query, :rabbit@liuqiang, {:badrpc, :timeout}}报错)解决办法:修改vim/hosts,然后添加主机IP 主机名字 rabbitmqctl stop rabbitmq-server detached 第十一步:192.168.1.60:15672。 第十二步:,可以使用默认的guest/guest用户登录,可以创建一个管理员账户 来登录。rabbitmqctl add_user admin admin rabbitmqctl set_user_tags admin administrator 5672 --client端通信口 15672 -- 管理界面ui端口 25672 -- server间内部通信口 如果采用java等等,连接rabbitmq 应该使用client通信口:5672
在RabbitMQ中,一个节点的服务其实也是作为一个集群来处理的,在web控制台的admin-> cluster 中可以看到集群的名字,并且可以在页面上修改。 而多节点的集群有两种方式: 普通集群:这种模式使用Erlang语言天生具备的集群方式搭建。这种集群模式下,集群的各个节点之间只会有相同的元数据,即队列结构,而消息不会进行冗余,只存在一个节点中。消费时,如果消费的不是存有数据的节点, RabbitMQ会临时在节点之间进行数据传输,将消息从存有数据的节点传输到消费的节点。 镜像模式:这种模式是在普通集群模式基础上的一种增强方案,这也就是RabbitMQ的官HA高可用方案。需要在搭建了普通集群之后再补充搭建。其本质区别在于,这种模式会在镜像节点中间主动进行消息同步,而不是在客户端拉取消息时临时同。 这种模式的消息可靠性更高,因为每个节点上都存着全量的消息。而他的弊端也是明显的,集群内部的网络带宽会被这种同步通讯大量的消耗,进而降低整个集群的性能。这种模式下,队列数量最好不要过多。 步。 第一步:需要同步集群节点中的cookie。首先需要保证worker1上的rabbitmq服务是正常启动的。 第二步:rabbitmqctl stop 第三步:rabbitmqctl join_cluster --ram rabbit@liuqiang61 第四步:service rabbitmq-server start 第四步:--ram 表示以Ram节点加入集群。RabbitMQ的集群节点分为disk和ram。disk节点会将元数据保存到硬盘当中,而ram节点只是在内存中保存元数据。 第五步:搭建镜像集群,首先创建一个/mirror的虚拟主机,然后再添加给对应的镜像策略 rabbitmqctl add_vhost /mirror rabbitmqctl set_policy ha-all --vhost "/mirror" "^" '{"ha-mode":"all"}' 第六步:在做业务集成时,还是需要注意队列数量不宜过多,并且尽量不要让RabbitMQ产生大量的消息堆积。
(1)系统可用性降低 系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。 (2)系统复杂度提高 引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问 题。 (3)消息一致性问题 A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。
(1)经点队列classic:单机环境,消息可靠性高,可以选择是否持久化Durability,是否自动删除Auto delete (2)仲裁队列Quorum:Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。 Quorum队列更适合于 队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。 也对应以下一些不适合使用的场景: 1、一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。 2、对消息低延迟要求高: 一致性算法会影响消息的延迟。 3、对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。 4、队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。 (3)stream队列:这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。
参数详解: channel.exchangeDeclare(): 1、type:有direct、fanout、topic三种 2、durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在 3、autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange chanel.basicQos(): 1、prefetchSize:0 2、prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack 3、global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别 void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; channel.basicPublish(): 1、routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用 2、mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉 3、mmediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。 4、BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留 简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。 void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; channel.basicAck(): 1、deliveryTag:该消息的index 2、multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。 void basicAck(long deliveryTag, boolean multiple) throws IOException; channel.queueDeclare(QUEUE_NAME, false, false, false, null) 1、durable:true、false true:在服务器重启时,能够存活 2、exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。 3、autodelete:当没有任何消费者使用时,自动删除该队列。 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; chanel.exchangeBind() 1、channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 用于通过绑定bindingKey将queue到Exchange,之后便可以进行消息接收 Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
(1)如何保证消息不丢失 rabbit就是针对企业内部使用的,消息可靠性高 消息丢失的情况:(1)生产者发送给rabbitmq(2)持久化到系统(3)rabbitmq发送给消费者 通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。 《1》生产者保证消息能正确发送到MQ 对于单个数据,可以使用生产者确认机制。通过多次确认的方式,保证生产者的消息能够正确的发送到RabbitMQ中。 在RabbitMQ中,另外还有一种手动事务的方式,可以保证消息正确发送。手动事务机制主要有几个关键的方法: channel.txSelect() 开启事务;channel.txCommit() 提交事务; channel.txRollback() 回滚事务; 用这几个方法来进行事务管理。但是这种方式需要手动控制事务逻辑,并且手动事务会对channel产生阻塞,造成吞吐量下降 《2》RabbitMQ消息存盘不丢消息 这个在RabbitMQ中比较好处理,对于Classic经典队列,直接将队列声明成为持久化队列即可。而新增的Quorum队列和Stream队列,都是明显的持久化队列,能更好的保证服务端消息不会丢失。 《3》RabbitMQ 主从消息同步时不丢消息 首先其他的普通集群模式,消息是分散存储的,不会主动进行消息同步了,是有可能丢失消息的。而镜像模式集群,数据会主动在集群各个节点当中同步,这时丢失消息的概率不会太高。另外,启用Federation联邦机制,给包含重要消息的队列建立一个远端备份,也是一个不错的选择。 《4》RabbitMQ消费者不丢失消息 RabbitMQ在消费消息时可以指定是自动应答,还是手动应答。如果是自动应答模式,消费者会在完成业务处理后自动进行应答,而如果消费者的业务逻辑抛出异常,RabbitMQ会将消息进行重试,这样是不会丢失消息的,但是有可能会造成消息一直重复消费。 将RabbitMQ的应答模式设定为手动应答可以提高消息消费的可靠性 (2)如何保证消息幂等? 《1》RabbitMQ的自动重试功能 (3)如何保证消息的顺序? 在RabbitMQ当中,针对消息顺序的设计其实是比较弱的。唯一比较好的策略就是 单队列+单消息推送。即一组有序消息,只发到一个队列中,利用队列的FIFO特性保证消息在队列内顺序不会乱。但是,显然,这是以极度消耗性能作为代价的,在实际适应过程中,应该尽量避免这种场景。spring.rabbitmq.listener.simple.prefetch=1 (4)关于RabbitMQ的数据堆积问题 首先在消息生产者端: 对于生产者端,最明显的方式自然是降低消息生产的速度。但是,生产者端产生消息的速度通常是跟业务息息相关的,一般情况下不太好直接优化。但是可以选择尽量多采用批量消息的方式,降低IO频率。 然后在RabbitMQ服务端: 从前面的分享中也能看出,RabbitMQ本身其实也在着力于提高服务端的消息堆积能力。对于消息堆积严重的队列,可以预先添加懒加载机制,或者创建Sharding分片队列,这些措施都有助于优化服务端的消息堆积能力。另外,尝试使用Stream队列,也能很好的提高服务端的消息堆积能力。 接下来在消息消费者端: 要提升消费速度最直接的方式,就是增加消费者数量了。尤其当消费端的服务出现问题,已经有大量消息堆积时。这时,可以尽量多的申请机器,部署消费端应用,争取在最短的时间内消费掉积压的消息。但是这种方式需要注意对其他组件的性能压力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。