赞
踩
systemctl status edgecore 查状态
systemctl stop edgecore 关服务
systemctl disable edgecore 让服务不要开机启动
systemctl restart rabbitmq-server
systemctl start|stop|restart rabbitmq-server
1、安装依赖,因为RabbitMQ依赖 erlang,所以我们先安装erlang
sudo apt-get install erlang-nox
2、安装RabbitMQ
安装之前先更新一下,Ubuntu 源
sudo apt-get update
安装MQ
sudo apt-get install rabbitmq-server
3、本地新增配置文件
注:任意位置,我是在 /etc/rabbitmq
# cd /etc/rabbitmq
- # vim /etc/rabbitmq/rabbitmq.conf
- #添加
- #默认端口为5672
- listeners.tcp.default=32703
- #界面管理端口(默认端口为15672)
- management.tcp.port=15672
vim rabbitmq-env.conf
NODE_PORT=32703
4、修改rabbitmq-defaults文件,添加配置文件路径
rabbitmq-defaults 在路径 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/sbin/ 下
执行 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/sbin/rabbitmq-defaults
- #!/bin/sh -e
- ## The contents of this file are subject to the Mozilla Public License
- ## Version 1.1 (the "License"); you may not use this file except in
- ## compliance with the License. You may obtain a copy of the License
- ## at https://www.mozilla.org/MPL/
- ##
- ## Software distributed under the License is distributed on an "AS IS"
- ## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- ## the License for the specific language governing rights and
- ## limitations under the License.
- ##
- ## The Original Code is RabbitMQ.
- ##
- ## The Initial Developer of the Original Code is GoPivotal, Inc.
- ## Copyright (c) 2012-2019 Pivotal Software, Inc. All rights reserved.
- ##
-
- ### next line potentially updated in package install steps
- SYS_PREFIX=
-
- CLEAN_BOOT_FILE=start_clean
- SASL_BOOT_FILE=start_sasl
- BOOT_MODULE="rabbit"
-
- if test -z "$CONF_ENV_FILE" && test -z "$RABBITMQ_CONF_ENV_FILE"; then
- CONF_ENV_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq-env.conf
- fi
- CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf
在文件中新增 CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf 保存退出.
5、添加超级用户以及权限,此处注意,下面的命令,添加用户admin,密码设置为xxxx这里的xxxx需要自己自行修改
rabbitmqctl add_user edgeprd s5P6w0d4DtCBspWAepph
6、给刚才新创建admin用户,授予管理员权限
rabbitmqctl set_user_tags edgeprd administrator
7、授予Virtual host中资源的配置读写权限
sudo rabbitmqctl set_permissions -p / edgeprd '.*' '.*' '.*'
8、授予Virtual host中资源的配置读写权限
启动管理界面,通过web访问管理界面
我们在安装MQ的时候,已经安装好了rabbitmq_management,只需要执行下面的命令,即可在浏览器访问MQ的,web管理界面
rabbitmq-plugins enable rabbitmq_management
9、MQ简单的启动操作命令
启动:sudo rabbitmq-server start 停止:sudo rabbitmq-server stop 重启:sudo rabbitmq-server restart 状态检查:sudo rabbitmqctl status
- root@VM-0-15-ubuntu:/etc/rabbitmq# service rabbitmq-server restart
- Job for rabbitmq-server.service failed because the control process exited with error code.
- See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.
执行提示中的命令: journalctl -xe 可查看失败原因
- root@VM-0-15-ubuntu:/etc/rabbitmq# journalctl -xe
- -- Unit openvpn@kangpa.service has begun starting up.
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Stopped OpenVPN connection to neimemggu.
- -- Subject: Unit openvpn@neimemggu.service has finished shutting down
- -- Defined-By: systemd
- -- Support: http://www.ubuntu.com/support
- --
- -- Unit openvpn@neimemggu.service has finished shutting down.
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Starting OpenVPN connection to neimemggu...
- -- Subject: Unit openvpn@neimemggu.service has begun start-up
- -- Defined-By: systemd
- -- Support: http://www.ubuntu.com/support
- --
- -- Unit openvpn@neimemggu.service has begun starting up.
- Mar 11 19:09:33 VM-0-15-ubuntu ovpn-kangpa[25815]: Options error: In [CMD-LINE]:1: Error opening configuration file: /etc/openvpn/kangpa.conf
- Mar 11 19:09:33 VM-0-15-ubuntu ovpn-kangpa[25815]: Use --help for more information.
- Mar 11 19:09:33 VM-0-15-ubuntu ovpn-neimemggu[25816]: Options error: In [CMD-LINE]:1: Error opening configuration file: /etc/openvpn/neimemggu.conf
- Mar 11 19:09:33 VM-0-15-ubuntu ovpn-neimemggu[25816]: Use --help for more information.
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@kangpa.service: Main process exited, code=exited, status=1/FAILURE
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@kangpa.service: Failed with result 'exit-code'.
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Failed to start OpenVPN connection to kangpa.
- -- Subject: Unit openvpn@kangpa.service has failed
- -- Defined-By: systemd
- -- Support: http://www.ubuntu.com/support
- --
- -- Unit openvpn@kangpa.service has failed.
- --
- -- The result is RESULT.
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@neimemggu.service: Main process exited, code=exited, status=1/FAILURE
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@neimemggu.service: Failed with result 'exit-code'.
- Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Failed to start OpenVPN connection to neimemggu.
- -- Subject: Unit openvpn@neimemggu.service has failed
cd /var/log/rabbitmq/
如果是报 exception exit: {bad_return,
{{rabbit,start,[normal,[]]},
{'EXIT',
{{badmatch,
{error,
{{{badmatch,
{error,
{not_a_dets_file,
"/var/lib/rabbitmq/mnesia/rabbit@aicar-48b02d6781a7/recovery.dets"}}},
删除该文件,重启就可以 service rabbitmq-server restart
在本阶段我们会学习到众多的分布式解决方案。比如:分布式缓存,分布式会话,分布式搜索引擎,分布式消息队列,分布式锁,读写分离、分库分表,分布式全局唯一主键ID、分布式事务和数据一致性,分布式接口幂等性,分布式限流。
本周首先会讲解业界主流消息队列技术选型,提升大家的技术判断能力,之后是SpringBoot与RabbitMQ整合,最后带大家进行RabbitMQ基础组件封装。一起感受代码设计魅力吧。
小伙伴们大家好,现在我们和大家一起了解一下古老而又神秘的消息中间件"ActiveMQ"。首先,说起ActiveMQ,就必须先聊聊JMS(Java Message Service)规范,也就是Java消息服务,它定义了Java中访问消息中间件的接口的规范。在这里注意哦,JMS只是接口,并没有给予实现,实现JMS接口的消息中间件称为 “JMS Provider”,目前知名的开源 MOM (Message Oriented Middleware,也就是消息中间件)系统包括Apache的ActiveMQ、RocketMQ、Kafka,以及RabbitMQ,可以说他们都 “基本遵循” 或 “参考” JMS规范,都有自己的特点和优势。
专业术语
JMS(Java Message Service):实现JMS 接口的消息中间件;
Provider(MessageProvider):消息的生产者;
Consumer(MessageConsumer):消息的消费者;
PTP(Point to Point):即点对点的消息模型,这也是非常经典的模型;
Pub / Sub(Publish/Subscribe):,即发布/订阅的消息模型;
Queue:队列目标,也就是我们常说的消息队列,一般都是会真正的进行物理存储;
Topic:主题目标;
ConnectionFactory:连接工厂,JMS 用它创建连接;
Connection:JMS 客户端到JMS Provider 的连接;
Destination:消息的目的地;
Session:会话,一个发送或接收消息的线程(这里Session可以类比Mybatis的Session);
JMS 消息格式定义:
StreamMessage 原始值的数据流
MapMessage 一套名称/值对
TextMessage 一个字符串对象
BytesMessage 一个未解释字节的数据流
ObjectMessage 一个序列化的Java对象
ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在早些年的 “J2EE应用” 时期扮演着特殊的地位,可以说那个年代ActiveMQ在业界应用最广泛,当然如果现在想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断的升级版本,不断的提升性能和架构设计的重构。
就算现在我们 80% 以上的业务我们使用ActiveMQ已经足够满足需求,其丰富的API、多种集群构建模式使得他成为业界老牌消息中间件,在中小型企业中应用广泛!
当然如果你想针对大规模、高并发应用服务做消息中间件技术选型,譬如淘宝、京东这种大型的电商网站,尤其是双11这种特殊时间,ActiveMQ可能就显得力不从心了,当然我们这里后续还会和大家介绍其他非常优秀的MOM咯。
废话不多说,我们首先要了解JMS规范里最经典的两种消息投递模式,即 “点对点” 与 “发布订阅”。
点对点:生产者向队列投递一条消息,只有一个消费者能够监听得到这条消息(PTP),下图所示:
发布订阅:生产者向队列投递一条消息,所有监听该队列的消费者都能够监听得到这条消息(P/S),下图所示:
衡量一个MOM,我们主要从三方面考虑即可,即服务性能、存储堆积能力、可扩展性。
服务性能
ActiveMQ的性能一般,在早期传统行业为王的时代还是比较流行的,但现如今面对高并发、大数据的业务场景,往往力不从心!
数据存储
默认采用kahadb存储(索引文件形式存储),也可以使用高性能的google leveldb(内存数据库存储), 或者可以使用MySql、Oracle进程消息存储(关系型数据库存储)。
集群架构
ActiveMQ 可以与zookeeper进行构建 主备集群模型,并且多套的主备模型直接可以采用Network的方式构建分布式集群。
ActiveMQ最经典的两种集群架构模式,Master-Slave 、Network 集群模式!
Master-Slave:
Master-Slave:顾名思义,就是主从方式,当然这里要理解为主备的方式,也就是双机热备机制;Master Slave 背后的想法是,消息被复制到slave broker,因此即使master broker遇到了像硬件故障之类的错误,你也可以立即切换到slave broker而不丢失任何消息。 Master Slave是目前ActiveMQ推荐的高可靠性和容错的解决方案。
架构思考:Master-Slave集群模型的关键点:
上图(Master-Slave)绿色的为主节点,灰色的则为备份节点,这两个节点都是运行状态的。
zookeeper的作用就是为了当绿色的主节点宕机时,进行及时切换到备份的灰色节点上去,使其进行主从角色的互换,用于实现高可用性的方案。
Master-Slave集群模型的缺点也显而易见,就是不能做到分布式的topic、queue,当消息量巨大时,我们的MQ集群压力过大,没办法满足分布式的需求。
Network:
Network:这里可以理解为网络通信方式,也可以说叫Network of brokers。这种方式真正解决了分布式消息存储和故障转移、broker切换的问题。可以理解消息会进行均衡;从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。
架构思考:Network集群模型的关键点:
首先,这种方案需要两套或多套(Master-Slave)的集群模型才可以搞定,部署非常麻烦,需要两套或多套集群直接相互交叉配置,相互间能够感知到彼此的存在。下面我给出一段XML配置,简单来说就是在ActiveMQ的配置文件里要进行多套(Master-Slave)之间的 networkConnector配置工作:
<broker brokerName="receiver" persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="tcp://localhost:62002"/>
</transportConnectors>
<networkConnectors>
<networkConnector
uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
</networkConnectors>
</broker>
其次,Network虽然解决了分布式消息队列这个难题,但是还有很多潜在的问题,最典型的就是资源浪费问题,并且也可能达不到所预期的效果;通常采用Master-Slave模型是传统型互联网公司的首选,作为互联网公司往往会选择开箱即用的消息中间件,从运维、部署、使用各个方面都要优于ActiveMQ,当然ActiveMQ毕竟是 “老牌传统强Q”,Apache的顶级项目之一,目前正在进行新版本的重构(对于5.X版本)与落地,下一代 “Artemis代理”,也可以理解为 “6.X”;有兴趣的小伙伴可以关注一下官网,传送门如下:ActiveMQ
Hi,小伙伴们,本节课我们通过简要的图文学习,带大家快速的过了一下ActiveMQ,那么小伙伴们记住一定要在脑海里建立知识的结构体系,并串联起来!无论是现在,还是说未来,本神都希望小伙伴要按照下面的步骤进行回忆和复习:
什么是JMS?
JMS的规范有哪些,分别代表什么含义?
ActiveMQ的历史背景
关于消息的投递模式(PTP、P/S)
ActiveMQ的各项指标
ActiveMQ的集群架构模型(Master-Slave、Network)
为了方便爱学习的小伙伴,本神特意加餐一波,提供官方文档手册、还有相关部署软件包,以及私人珍藏代码(DEMO),用来辅助小伙伴们对ActiveMQ有一个更深入的认知哦!不谢
官方JMS文档
ActiveMQ(5.x)服务包(Windows平台)
ActiveMQ代码示例
RocketMQ是一款分布式、队列模型的消息中间件,由阿里巴巴自主研发的一款适用于高并发、高可靠性、海量数据场景的消息中间件。早期开源2.x版本名为MetaQ;15年迭代3.x版本,更名为RocketMQ,16年开始贡献到Apache,经过1年多的孵化,最终成为Apache顶级的开源项目,更新非常频繁,社区活跃度也非常高;目前最新版本为4.5.1-release版本(2019-7-20日前)。RocketMQ参考借鉴了优秀的开源消息中间件Apache Kafka(这也是我们后面课程中重点要讲解的内容哦),其消息的路由、存储、集群划分都借鉴了Kafka优秀的设计思路,并结合自身的 “双十一” 场景进行了合理的扩展和API丰富。
接下来我们一起来看一下RocketMQ优秀的能力吧 ~
支持集群模型、负载均衡、水平扩展能力
亿级别的消息堆积能力
采用零拷贝的原理、顺序写盘、随机读(索引文件)
丰富的API使用
代码优秀,底层通信框架采用Netty NIO框架
NameServer 代替 Zookeeper
强调集群无单点,可扩展,任意一点高可用,水平可扩展
消息失败重试机制、消息可查询
开源社区活跃度、是否足够成熟(经过双十一考验)
任何一种技术框架,都有 “她” 的专有名词,在你刚开始接触 “她” 的时候,一定要了解 “她” 的专业术语,这样能够更快速、更高效的和 “她” 愉快的玩耍…
Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。
Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。
Push Consumer:Consumer的一种,需要向Consumer对象注册监听。
Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。
Producer Group:生产者集合,一般用于发送一类消息。
Consumer Group:消费者集合,一般用于接受一类消息进行消费。
Broker : MQ消息服务(中转角色,用于消息存储与生产消费转发)。
如下图所示,我们要带小伙伴们一起来看看RocketMQ源码包的组成,这样更方便我们日后对他有一个深入的学习。
rocketmq-broker 主要的业务逻辑,消息收发,主从同步, pagecache
rocketmq-client 客户端接口,比如生产者和消费者
rocketmq-common 公用数据结构等等
rocketmq-distribution 编译模块,编译输出等
rocketmq-example 示例,比如生产者和消费者
rocketmq-fliter 进行Broker过滤的不感兴趣的消息传输,减小带宽压力
rocketmq-logappender、rocketmq-logging日志相关
rocketmq-namesrv Namesrv服务,用于服务协调
rocketmq-openmessaging 对外提供服务
rocketmq-remoting 远程调用接口,封装Netty底层通信
rocketmq-srvutil 提供一些公用的工具方法,比如解析命令行参数
rocketmq-store 消息存储核心包
rocketmq-test 提供一些测试代码包
rocketmq-tools 管理工具,比如有名的mqadmin工具
RocketMQ为我们提供了丰富的集群架构模型,包括单点模式、主从模式、双主模式、以及生产上使用最多的双主双从模式(或者说多主多从模式),在这里我们仅介绍一下经典的双主双从集群模型,如下图所示:
Producer集群就是生产者集群(他们在同一个生产者组 Producer Group)
Consumer集群就是消费者集群(他们在同一个消费者组 Consumer Group)
NameServer集群作为超轻量级的配置中心,只做集群元数据存储和心跳工作,不必保障节点间数据强一致性,也就是说NameServer集群是一个多机热备的概念。
对于Broker而言,通常Master与Slave为一组服务,他们互为主从节点,通过NameServer与外部的Client端暴露统一的集群入口。Broker就是消息存储的核心MQ服务了。
RocketMQ作为国内顶级的消息中间件,其性能主要依赖于天然的分布式Topic/Queue,并且其内存与磁盘都会存储消息数据,借鉴了Kafka的 “空中接力” 概念(这个我们后面学习Kafka的时候会详细的说明),所谓 “空中接力” 就是指数据不一定要落地,RocketMQ提供了同步/异步双写、同步/异步复制的特性。在真正的生产环境中应该选择符合自己业务的配置。下面针对于RocketMQ的高性能及其瓶颈在这里加以说明:
架构思考:
RocketMQ目前本人在公司内部实际生产环境采用8M-8S的集群架构(8主8从)硬件单点Master为32C,96G内存,500G的SSD
其主要瓶颈最终会落在IOPS上面,当高峰期来临的时候,磁盘读写能力是主要的性能瓶颈,每秒收发消息IOPS达到10W+ 消息,这也是公司内部主要的可靠性消息中间件。
在很多时候,我们的业务会有一些非核心的消息投递,后续会进行消息中间件的业务拆分,把不重要的消息(可以允许消息丢失、非可靠性投递的消息)采用Kafka的异步发送机制,借助Kafka强大的吞吐量和消息堆积能力来做业务的分流(当然RocketMQ的性能也足够好)。
为什么瓶颈在IOPS? 根本原因还是因为云环境导致的问题,云环境的SSD物理存储显然和自建机房SSD会有不小的差距,这一点我们无论是从数据库的磁盘性能、还是搜索服务(ElasticSearch)的磁盘性能,都能给出准确的瓶颈点,单机IOPS达到1万左右就是云存储SSD的性能瓶颈,这个也解释了 “木桶短板原理” 的效应,在真正的生产中,CPU的工作主要在等待IO操作,高并发下 CPU资源接近极限,但是IOPS还是达不到我们想要的效果。
Hi,小伙伴们,本节课我们通过简要的图文学习,带大家快速的过了一下RocketMQ,那么小伙伴们记住一定要在脑海里建立知识的结构体系,并串联起来!无论是现在,还是说未来,本神都希望小伙伴要按照下面的步骤进行回忆和复习:
RocketMQ的前世今生 ?
RocketMQ的专业术语?
RocketMQ源码包的组成?
RocketMQ的集群架构模型
RocketMQ在真正生产环境中面临的瓶颈点以及解决方案
为了方便爱学习的小伙伴们,本神特意加餐一波,提供RocketMQ的官方文档和相关软件包,以及对应的代码示例,辅助小伙伴们更好掌握RocketMQ。另外本神还在慕课网上的一门课程详细的讲解了RocketMQ的各个知识点以及实战应用,有想深入学习的小伙伴可以学习共勉。
官方文档
RocketMQ4.3.0版本(Linux平台)
RocketMQ代码示例
慕课网实战课程《RocketMQ核心技术精讲与高并发抗压实战》传送门:RocketMQ核心技术精讲与高并发抗压实战-慕课网
Hi,ALL!
善于总结、善于思考,你才会进步!
三个软件包
参照急速入门文档。
急速入门,在这里我们使用RabbitMQ 3.6.5 版本进行操作:
环境搭建:
环境描述:Linux(centos7 Redhat7)
1. 首先在Linux上进行一些软件的准备工作,yum下来一些基础的软件包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
配置好主机名称:/etc/hosts /etc/hostname
2. 下载RabbitMQ所需软件包(本神在这里使用的是 RabbitMQ3.6.5 稳定版本)
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
3. 安装服务命令
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
三台服务器分别为:www.itjim.cn itjim2 itjim3
4. 修改用户登录与连接心跳检测,注意修改
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
修改点1:loopback_users 中的 <<"guest">>,只保留guest (用于用户登录)
修改点2:heartbeat 为10(用于心跳连接)
5. 安装管理插件
5.1 首先启动服务(后面 | 包含了停止、查看状态以及重启的命令)
/etc/init.d/rabbitmq-server start | stop | status | restart
//如果失败看看是不是端口没有开启,开启端口重启服务器
5.2 查看服务有没有启动: lsof -i:5672 (5672是Rabbit的默认端口)
rabbitmq-plugins enable rabbitmq_management
5.3 可查看管理端口有没有启动:
lsof -i:15672 或者 netstat -tnlp | grep 15672
6. 一切OK 我们访问地址,输入用户名密码均为 guest :
http://你的ip地址:15672/
7. 如果一切顺利,那么到此为止,我们的环境已经安装完啦
- 1
8.问题总结
1.starting rabbitmq-server (via systemctl): Job for rabbitmq-server.service failed because the control process exited with error code. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.
1.首先运行systemctl status rabbitmq-server.service(查看当前rabbitmq-server.service的服务状态)和journalctl -xe从结尾看日志
修改了hostname和hosts后面记得保存source还有就是重启服务器。
上面下载是升级的时候备份数据
添加队列
看api项目helloworld
看api中exchange包中direct
需要先启动接收信息,然后在发送信息。
看api中exchange包中topic,模糊匹配。多对多,一种消息匹配一种规则。
如果收到三条信息,说明没有解绑,下图解绑掉users.#就可以。
启动接收端,在启动发送端,发现一人五条消息。
需要return的话就需要mandatory为truef
确认消息,返回消息,如果return消息那么需要设置mandatory设置为true。
https://zhuanlan.zhihu.com/p/70200202
在新版本的 springboot rabbitmq 中 spring.rabbitmq.publisher-confirms 已经失效。需要使用 publisher-confirm-type 替代。
publisher-confirm-type 新版发布确认属性有三种确认类型:
NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
/**
* 设置生产者消息publish-confirm回调函数
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if(!ack){
LoggerUtil.error(RabbitConfig.class, StringUtils.join("publishConfirm消息发送到交换器被退回,Id:", correlationData.getId(), ";退回原因是:", cause));
} else {
LoggerUtil.info(RabbitConfig.class, "发送消息到交换器成功,MessageId:"+correlationData.getId());
}
});
MANUAL:手动签收 最小线程数1,最大线程数5。
- spring:
- rabbitmq:
- virtual-host:/
- connection-timeout: 15000
- publisher-confirm-type: correlated
- publisher-returns: true #注意和mandatory一起使用
- template:
- mandatory: true
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- default-property-inclusion: NON_NULL
注意改服务器地址
- package com.bfxy.rabbit.producer.component;
-
- import java.util.Map;
- import java.util.UUID;
-
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageHeaders;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
-
-
- @Component
- public class RabbitSender {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
- */
- final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
- /**
- * @param correlationData 作为一个唯一的标识
- * @param ack broker 是否落盘成功
- * @param cause 失败的一些异常信息
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId());
- }
- };
-
- /**
- * 对外发送消息的方法
- * @param message 具体的消息内容
- * @param properties 额外的附加属性
- * @throws Exception
- */
- public void send(Object message, Map<String, Object> properties) throws Exception {
-
- MessageHeaders mhs = new MessageHeaders(properties);
- Message<?> msg = MessageBuilder.createMessage(message, mhs);
-
- rabbitTemplate.setConfirmCallback(confirmCallback);
- //指定业务唯一的iD
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- //发送消息要做什么
- MessagePostProcessor mpp = new MessagePostProcessor() {
- @Override
- public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
- throws AmqpException {
- System.err.println("---> post to do: " + message);
- return message;
- }
- };
-
- rabbitTemplate.convertAndSend("exchange-1",
- "springboot.rabbit",
- msg, mpp, correlationData);
- }
- }
rabbitTemplate.send():是一条一条发送。
convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。
convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: manual
- concurrency: 5 #线程数为5
- prefetch: 1 #批量消费,一条一条消费
- max-concurrency: 10 #最大线程数为10
先启动消费端在启动客户端,然后可以看到消息发过去。
先启动客户端再注释掉
会发现能收到消息。但是下面圈是1。停止消费端,不注释最后一行代码,运行消费端发现还是可以接收消息,而且没有1了。
就是分包
创建这五个类讲解
不同的topic,不同的rabbitTemplate
就讲一个类
service主要是用来存储
高并发不会用zk做分布式锁,会用redis
接下来本神带着小伙伴们简单了解一下Apache-Zookeeper,并进行Zookeeper集群的环境搭建;
Zookeeper基础知识、体系结构、数据模型
Zookeeper是一个类似linux、hdfs的树形文件结构,zookeeper可以用来保证数据在(Zookeeper)集群之间的数据的事务性一致性,zookeeper也是我们常说的CAP理论中的CP(强一致性);
Zookeeper有一个概念叫watch(也称之为事件),是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client端,即watcher实例对象(用于改变节点的变化而做出相应的行为)
关于Zookeeper其他相关内容,小伙伴可以参考一些相关的资料,在这里我们仅仅使用Zookeeper做注册中心;
Zookeeper有三个角色:Leader,Follower,Observer
Leader:数据总控节点,用于接收客户端连接请求,分发给所有的Follower节点后,各个Follower节点进行更新数据操作并返回给Leader节点,如果满足半数以上(所以Zookeeper集群一般是奇数个节点)更新成功则此次操作成功;
Follower:相当于跟随者的角色,Zookeeper的Leader宕机(挂掉)时,所有的Follower角色内部会产生选举机制,选举出新的Leader用于总控;
Observer:顾名思义,就是我们的客户端,用于观察Zookeeper集群的数据发送变化,如果产生变化则zookeeper会主动推送watch事件给Observer(客户端),用于对数据变化的后续处理;当然Observer(客户端)也可以发送数据变更请求;
Zookeeper应用场景:
统一命名服务(Name Service)
配置管理(Configuration Management)
集群管理(Group Membership)
共享锁(Locks)
队列管理
Zookeeper集群环境搭建与配置:
1. 准备工作:
## 准备3个节点,要求配置好主机名称,服务器之间系统时间保持一致
## 注意 /etc/hostname 和 /etc/hosts 配置主机名称(在这个里我准备bhz125,bhz126,bhz127三节点)
## 特别注意 以下操作3个节点要同时进行操作哦!
2. 上传zk到三台服务器节点
## 注意我这里解压到/usr/local下
2.1 进行解压: tar zookeeper-3.4.6.tar.gz
2.2 重命名: mv zookeeper-3.4.6 zookeeper
2.3 修改环境变量: vim /etc/profile
## 这里要添加zookeeper的全局变量
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=.:$ZOOKEEPER_HOME/bin
2.4 刷新环境变量: source /etc/profile
2.5 到zookeeper下修改配置文件:
2.5.1 首先到指定目录: cd /usr/local/zookeeper/conf
2.5.2 然后复制zoo_sample.cfg文件,复制后为zoo.cfg: mv zoo_sample.cfg zoo.cfg
2.5.3 然后修改两处地方, 最后保存退出:
(1) 修改数据的dir
dataDir=/usr/local/zookeeper/data
(2) 修改集群地址
server.0=bhz125:2888:3888
server.1=bhz126:2888:3888
server.2=bhz127:2888:3888
2.6 增加服务器标识配置,需要2步骤,第一是创建文件夹和文件,第二是添加配置内容:
(1) 创建文件夹: mkdir /usr/local/zookeeper/data
(2) 创建文件myid 路径应该创建在/usr/local/zookeeper/data下面,如下:
vim /usr/local/zookeeper/data/myid
注意这里每一台服务器的myid文件内容不同,分别修改里面的值为0,1,2;与我们之前的zoo.cfg配置文 件里:server.0,server.1,server.2 顺序相对应,然后保存退出;
2.7 到此为止,Zookeeper集群环境大功告成!启动zookeeper命令
启动路径:/usr/local/zookeeper/bin(也可在任意目录,因为配置了环境变量)
执行命令:zkServer.sh start (注意这里3台机器都要进行启动,启动之后可以查看状态)
查看状态:zkServer.sh status (在三个节点上检验zk的mode, 会看到一个leader和俩个follower)
Zookeeper客户端操作:
zkCli.sh 进入zookeeper客户端
根据提示命令进行操作:
查找:ls / ls /zookeeper
创建并赋值: create /imooc zookeeper
获取: get /imooc
设值: set /imooc zookeeper1314
PS1: 任意节点都可以看到zookeeper集群的数据一致性
PS2: 创建节点有俩种类型:短暂(ephemeral) 持久(persistent), 这些小伙伴们可以查找相关资料,我们这里作为入门不做过多赘述!
Zookeeper核心配置详解:(zoo.cfg配置文件,扩展内容)
(1)tickTime:基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每隔 tickTime时间就会发送一个心跳。
(2)initLimit:这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数,当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。
(3)syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒
(4)dataDir:存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
(5)clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
(6)至于最后的配置项:server.A = B:C:D:
A表示这个是第几号服务器,
B 是这个服务器的 ip 地址;
C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader
优秀博文推荐:zookeeper 集群搭建 - YSOcean - 博客园
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。