当前位置:   article > 正文

阶段三:第12周分布式消息队列-RabbitMQ_rabbitmq hostname

rabbitmq hostname

systemctl status edgecore 查状态
systemctl stop edgecore 关服务
systemctl disable edgecore 让服务不要开机启动


systemctl restart rabbitmq-server
systemctl start|stop|restart rabbitmq-server

1、安装依赖,因为RabbitMQ依赖 erlang,所以我们先安装erlang

sudo apt-get install erlang-nox 

2、安装RabbitMQ

安装之前先更新一下,Ubuntu 源

sudo apt-get update

安装MQ

sudo apt-get install rabbitmq-server

3、本地新增配置文件

注:任意位置,我是在 /etc/rabbitmq

#  cd /etc/rabbitmq
  1. # vim /etc/rabbitmq/rabbitmq.conf
  2. #添加
  3. #默认端口为5672
  4. listeners.tcp.default=32703
  5. #界面管理端口(默认端口为15672
  6. management.tcp.port=15672
vim rabbitmq-env.conf

NODE_PORT=32703

4、修改rabbitmq-defaults文件,添加配置文件路径

rabbitmq-defaults 在路径 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5/sbin/ 下

执行 vim  /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/sbin/rabbitmq-defaults
  1. #!/bin/sh -e
  2. ## The contents of this file are subject to the Mozilla Public License
  3. ## Version 1.1 (the "License"); you may not use this file except in
  4. ## compliance with the License. You may obtain a copy of the License
  5. ## at https://www.mozilla.org/MPL/
  6. ##
  7. ## Software distributed under the License is distributed on an "AS IS"
  8. ## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  9. ## the License for the specific language governing rights and
  10. ## limitations under the License.
  11. ##
  12. ## The Original Code is RabbitMQ.
  13. ##
  14. ## The Initial Developer of the Original Code is GoPivotal, Inc.
  15. ## Copyright (c) 2012-2019 Pivotal Software, Inc. All rights reserved.
  16. ##
  17. ### next line potentially updated in package install steps
  18. SYS_PREFIX=
  19. CLEAN_BOOT_FILE=start_clean
  20. SASL_BOOT_FILE=start_sasl
  21. BOOT_MODULE="rabbit"
  22. if test -z "$CONF_ENV_FILE" && test -z "$RABBITMQ_CONF_ENV_FILE"; then
  23. CONF_ENV_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq-env.conf
  24. fi
  25. CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf

在文件中新增 CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf 保存退出.

5、添加超级用户以及权限,此处注意,下面的命令,添加用户admin,密码设置为xxxx这里的xxxx需要自己自行修改

rabbitmqctl add_user edgeprd s5P6w0d4DtCBspWAepph

6、给刚才新创建admin用户,授予管理员权限

rabbitmqctl set_user_tags edgeprd administrator

7、授予Virtual host中资源的配置读写权限

sudo rabbitmqctl  set_permissions -p / edgeprd '.*' '.*' '.*'

8、授予Virtual host中资源的配置读写权限

启动管理界面,通过web访问管理界面

我们在安装MQ的时候,已经安装好了rabbitmq_management,只需要执行下面的命令,即可在浏览器访问MQ的,web管理界面

rabbitmq-plugins enable rabbitmq_management

9、MQ简单的启动操作命令

启动:sudo rabbitmq-server start 
 
停止:sudo rabbitmq-server stop
 
重启:sudo rabbitmq-server restart
 
状态检查:sudo rabbitmqctl status

注: 如果报异常:

  1. root@VM-0-15-ubuntu:/etc/rabbitmq# service rabbitmq-server restart
  2. Job for rabbitmq-server.service failed because the control process exited with error code.
  3. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.

执行提示中的命令: journalctl -xe 可查看失败原因

  1. root@VM-0-15-ubuntu:/etc/rabbitmq# journalctl -xe
  2. -- Unit openvpn@kangpa.service has begun starting up.
  3. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Stopped OpenVPN connection to neimemggu.
  4. -- Subject: Unit openvpn@neimemggu.service has finished shutting down
  5. -- Defined-By: systemd
  6. -- Support: http://www.ubuntu.com/support
  7. --
  8. -- Unit openvpn@neimemggu.service has finished shutting down.
  9. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Starting OpenVPN connection to neimemggu...
  10. -- Subject: Unit openvpn@neimemggu.service has begun start-up
  11. -- Defined-By: systemd
  12. -- Support: http://www.ubuntu.com/support
  13. --
  14. -- Unit openvpn@neimemggu.service has begun starting up.
  15. Mar 11 19:09:33 VM-0-15-ubuntu ovpn-kangpa[25815]: Options error: In [CMD-LINE]:1: Error opening configuration file: /etc/openvpn/kangpa.conf
  16. Mar 11 19:09:33 VM-0-15-ubuntu ovpn-kangpa[25815]: Use --help for more information.
  17. Mar 11 19:09:33 VM-0-15-ubuntu ovpn-neimemggu[25816]: Options error: In [CMD-LINE]:1: Error opening configuration file: /etc/openvpn/neimemggu.conf
  18. Mar 11 19:09:33 VM-0-15-ubuntu ovpn-neimemggu[25816]: Use --help for more information.
  19. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@kangpa.service: Main process exited, code=exited, status=1/FAILURE
  20. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@kangpa.service: Failed with result 'exit-code'.
  21. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Failed to start OpenVPN connection to kangpa.
  22. -- Subject: Unit openvpn@kangpa.service has failed
  23. -- Defined-By: systemd
  24. -- Support: http://www.ubuntu.com/support
  25. --
  26. -- Unit openvpn@kangpa.service has failed.
  27. --
  28. -- The result is RESULT.
  29. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@neimemggu.service: Main process exited, code=exited, status=1/FAILURE
  30. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: openvpn@neimemggu.service: Failed with result 'exit-code'.
  31. Mar 11 19:09:33 VM-0-15-ubuntu systemd[1]: Failed to start OpenVPN connection to neimemggu.
  32. -- Subject: Unit openvpn@neimemggu.service has failed

查询错误日志

 cd /var/log/rabbitmq/

如果是报 exception exit: {bad_return,
                     {{rabbit,start,[normal,[]]},
                      {'EXIT',
                       {{badmatch,
                         {error,
                          {{{badmatch,
                             {error,
                              {not_a_dets_file,
                               "/var/lib/rabbitmq/mnesia/rabbit@aicar-48b02d6781a7/recovery.dets"}}},

删除该文件,重启就可以 service rabbitmq-server restart

阶段三:分布式架构-逐个击破分布式核心问题(9-17周)

在本阶段我们会学习到众多的分布式解决方案。比如:分布式缓存,分布式会话,分布式搜索引擎,分布式消息队列,分布式锁,读写分离、分库分表,分布式全局唯一主键ID、分布式事务和数据一致性,分布式接口幂等性,分布式限流。

第12周分布式消息队列-RabbitMQ

本周首先会讲解业界主流消息队列技术选型,提升大家的技术判断能力,之后是SpringBoot与RabbitMQ整合,最后带大家进行RabbitMQ基础组件封装。一起感受代码设计魅力吧。

第1章 分布式消息队列认知提升

· 1-1 学习指南(05:13)

· 1-3 MQ的应用场景与MQ性能衡量指标(07:48)

· 1-4 MQ的技术选型关注点(05:58)

· 1-5 ActiveMQ集群架构与原理解析

初识 JMS 与其专业术语

小伙伴们大家好,现在我们和大家一起了解一下古老而又神秘的消息中间件"ActiveMQ"。首先,说起ActiveMQ,就必须先聊聊JMS(Java Message Service)规范,也就是Java消息服务,它定义了Java中访问消息中间件的接口的规范。在这里注意哦,JMS只是接口,并没有给予实现,实现JMS接口的消息中间件称为 “JMS Provider”,目前知名的开源 MOM (Message Oriented Middleware,也就是消息中间件)系统包括Apache的ActiveMQ、RocketMQ、Kafka,以及RabbitMQ,可以说他们都 “基本遵循” 或 “参考” JMS规范,都有自己的特点和优势。

  • 专业术语

    • JMS(Java Message Service):实现JMS 接口的消息中间件;

    • Provider(MessageProvider):消息的生产者;

    • Consumer(MessageConsumer):消息的消费者;

    • PTP(Point to Point):即点对点的消息模型,这也是非常经典的模型;

    • Pub / Sub(Publish/Subscribe):,即发布/订阅的消息模型;

    • Queue:队列目标,也就是我们常说的消息队列,一般都是会真正的进行物理存储;

    • Topic:主题目标;

    • ConnectionFactory:连接工厂,JMS 用它创建连接;

    • Connection:JMS 客户端到JMS Provider 的连接;

    • Destination:消息的目的地;

    • Session:会话,一个发送或接收消息的线程(这里Session可以类比Mybatis的Session);

  • JMS 消息格式定义:

    • StreamMessage 原始值的数据流

    • MapMessage 一套名称/值对

    • TextMessage 一个字符串对象

    • BytesMessage 一个未解释字节的数据流

    • ObjectMessage 一个序列化的Java对象

了解ActiveMQ

ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在早些年的 “J2EE应用” 时期扮演着特殊的地位,可以说那个年代ActiveMQ在业界应用最广泛,当然如果现在想要有更强大的性能和海量数据处理能力,ActiveMQ还需要不断的升级版本,不断的提升性能和架构设计的重构。

就算现在我们 80% 以上的业务我们使用ActiveMQ已经足够满足需求,其丰富的API、多种集群构建模式使得他成为业界老牌消息中间件,在中小型企业中应用广泛!

当然如果你想针对大规模、高并发应用服务做消息中间件技术选型,譬如淘宝、京东这种大型的电商网站,尤其是双11这种特殊时间,ActiveMQ可能就显得力不从心了,当然我们这里后续还会和大家介绍其他非常优秀的MOM咯。

消息投递模式

废话不多说,我们首先要了解JMS规范里最经典的两种消息投递模式,即 “点对点” 与 “发布订阅”。

点对点:生产者向队列投递一条消息,只有一个消费者能够监听得到这条消息(PTP),下图所示:

发布订阅:生产者向队列投递一条消息,所有监听该队列的消费者都能够监听得到这条消息(P/S),下图所示:

ActiveMQ各项指标

衡量一个MOM,我们主要从三方面考虑即可,即服务性能、存储堆积能力、可扩展性。

  • 服务性能

    • ActiveMQ的性能一般,在早期传统行业为王的时代还是比较流行的,但现如今面对高并发、大数据的业务场景,往往力不从心!

  • 数据存储

    • 默认采用kahadb存储(索引文件形式存储),也可以使用高性能的google leveldb(内存数据库存储), 或者可以使用MySql、Oracle进程消息存储(关系型数据库存储)。

  • 集群架构

    • ActiveMQ 可以与zookeeper进行构建 主备集群模型,并且多套的主备模型直接可以采用Network的方式构建分布式集群。

ActiveMQ集群架构模式

ActiveMQ最经典的两种集群架构模式,Master-Slave 、Network 集群模式!

Master-Slave:

  • Master-Slave:顾名思义,就是主从方式,当然这里要理解为主备的方式,也就是双机热备机制;Master Slave 背后的想法是,消息被复制到slave broker,因此即使master broker遇到了像硬件故障之类的错误,你也可以立即切换到slave broker而不丢失任何消息。 Master Slave是目前ActiveMQ推荐的高可靠性和容错的解决方案。

  • 架构思考:Master-Slave集群模型的关键点:

    • 上图(Master-Slave)绿色的为主节点,灰色的则为备份节点,这两个节点都是运行状态的。

    • zookeeper的作用就是为了当绿色的主节点宕机时,进行及时切换到备份的灰色节点上去,使其进行主从角色的互换,用于实现高可用性的方案。

    • Master-Slave集群模型的缺点也显而易见,就是不能做到分布式的topic、queue,当消息量巨大时,我们的MQ集群压力过大,没办法满足分布式的需求。

Network:

  • Network:这里可以理解为网络通信方式,也可以说叫Network of brokers。这种方式真正解决了分布式消息存储和故障转移、broker切换的问题。可以理解消息会进行均衡;从ActiveMQ1.1版本起,ActiveMQ支持networks of brokers。它支持分布式的queues和topics。一个broker会相同对待所有的订阅(subscription):不管他们是来自本地的客户连接,还是来自远程broker,它都会递送有关的消息拷贝到每个订阅。远程broker得到这个消息拷贝后,会依次把它递送到其内部的本地连接上。

  • 架构思考:Network集群模型的关键点:

    • 首先,这种方案需要两套或多套(Master-Slave)的集群模型才可以搞定,部署非常麻烦,需要两套或多套集群直接相互交叉配置,相互间能够感知到彼此的存在。下面我给出一段XML配置,简单来说就是在ActiveMQ的配置文件里要进行多套(Master-Slave)之间的 networkConnector配置工作:

<broker brokerName="receiver" persistent="false" useJmx="false">

<transportConnectors>

<transportConnector uri="tcp://localhost:62002"/>

</transportConnectors>

<networkConnectors>

<networkConnector

uri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>

</networkConnectors>

    • </broker>

    • 其次,Network虽然解决了分布式消息队列这个难题,但是还有很多潜在的问题,最典型的就是资源浪费问题,并且也可能达不到所预期的效果;通常采用Master-Slave模型是传统型互联网公司的首选,作为互联网公司往往会选择开箱即用的消息中间件,从运维、部署、使用各个方面都要优于ActiveMQ,当然ActiveMQ毕竟是 “老牌传统强Q”,Apache的顶级项目之一,目前正在进行新版本的重构(对于5.X版本)与落地,下一代 “Artemis代理”,也可以理解为 “6.X”;有兴趣的小伙伴可以关注一下官网,传送门如下:ActiveMQ

本节知识点回顾

Hi,小伙伴们,本节课我们通过简要的图文学习,带大家快速的过了一下ActiveMQ,那么小伙伴们记住一定要在脑海里建立知识的结构体系,并串联起来!无论是现在,还是说未来,本神都希望小伙伴要按照下面的步骤进行回忆和复习:

  1. 什么是JMS?

  2. JMS的规范有哪些,分别代表什么含义?

  3. ActiveMQ的历史背景

  4. 关于消息的投递模式(PTP、P/S)

  5. ActiveMQ的各项指标

  6. ActiveMQ的集群架构模型(Master-Slave、Network)

补充课外资料

为了方便爱学习的小伙伴,本神特意加餐一波,提供官方文档手册、还有相关部署软件包,以及私人珍藏代码(DEMO),用来辅助小伙伴们对ActiveMQ有一个更深入的认知哦!不谢

  1. 官方JMS文档

  2. ActiveMQ(5.x)服务包(Windows平台)

  3. ActiveMQ代码示例

· 1-6 RabbitMQ集群架构模型与原理解析(20:30)

· 1-7 RocketMQ集群架构与原理解析

初识 RocketMQ

RocketMQ是一款分布式、队列模型的消息中间件,由阿里巴巴自主研发的一款适用于高并发、高可靠性、海量数据场景的消息中间件。早期开源2.x版本名为MetaQ;15年迭代3.x版本,更名为RocketMQ,16年开始贡献到Apache,经过1年多的孵化,最终成为Apache顶级的开源项目,更新非常频繁,社区活跃度也非常高;目前最新版本为4.5.1-release版本(2019-7-20日前)。RocketMQ参考借鉴了优秀的开源消息中间件Apache Kafka(这也是我们后面课程中重点要讲解的内容哦),其消息的路由、存储、集群划分都借鉴了Kafka优秀的设计思路,并结合自身的 “双十一” 场景进行了合理的扩展和API丰富。

优秀的能力与支持

接下来我们一起来看一下RocketMQ优秀的能力吧 ~

  • 支持集群模型、负载均衡、水平扩展能力

  • 亿级别的消息堆积能力

  • 采用零拷贝的原理、顺序写盘、随机读(索引文件)

  • 丰富的API使用

  • 代码优秀,底层通信框架采用Netty NIO框架

  • NameServer 代替 Zookeeper

  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展

  • 消息失败重试机制、消息可查询

  • 开源社区活跃度、是否足够成熟(经过双十一考验)

专业术语

任何一种技术框架,都有 “她” 的专有名词,在你刚开始接触 “她” 的时候,一定要了解 “她” 的专业术语,这样能够更快速、更高效的和 “她” 愉快的玩耍…

  • Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Push Consumer:Consumer的一种,需要向Consumer对象注册监听。

  • Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。

  • Producer Group:生产者集合,一般用于发送一类消息。

  • Consumer Group:消费者集合,一般用于接受一类消息进行消费。

  • Broker : MQ消息服务(中转角色,用于消息存储与生产消费转发)。

RocketMQ核心源码包及功能说明

如下图所示,我们要带小伙伴们一起来看看RocketMQ源码包的组成,这样更方便我们日后对他有一个深入的学习。

  • rocketmq-broker 主要的业务逻辑,消息收发,主从同步, pagecache

  • rocketmq-client 客户端接口,比如生产者和消费者

  • rocketmq-common 公用数据结构等等

  • rocketmq-distribution 编译模块,编译输出等

  • rocketmq-example 示例,比如生产者和消费者

  • rocketmq-fliter 进行Broker过滤的不感兴趣的消息传输,减小带宽压力

  • rocketmq-logappender、rocketmq-logging日志相关

  • rocketmq-namesrv Namesrv服务,用于服务协调

  • rocketmq-openmessaging 对外提供服务

  • rocketmq-remoting 远程调用接口,封装Netty底层通信

  • rocketmq-srvutil 提供一些公用的工具方法,比如解析命令行参数

  • rocketmq-store 消息存储核心包

  • rocketmq-test 提供一些测试代码包

  • rocketmq-tools 管理工具,比如有名的mqadmin工具

集群架构模型

RocketMQ为我们提供了丰富的集群架构模型,包括单点模式、主从模式、双主模式、以及生产上使用最多的双主双从模式(或者说多主多从模式),在这里我们仅介绍一下经典的双主双从集群模型,如下图所示:

  • Producer集群就是生产者集群(他们在同一个生产者组 Producer Group)

  • Consumer集群就是消费者集群(他们在同一个消费者组 Consumer Group)

  • NameServer集群作为超轻量级的配置中心,只做集群元数据存储和心跳工作,不必保障节点间数据强一致性,也就是说NameServer集群是一个多机热备的概念。

  • 对于Broker而言,通常Master与Slave为一组服务,他们互为主从节点,通过NameServer与外部的Client端暴露统一的集群入口。Broker就是消息存储的核心MQ服务了。

集群架构思考

RocketMQ作为国内顶级的消息中间件,其性能主要依赖于天然的分布式Topic/Queue,并且其内存与磁盘都会存储消息数据,借鉴了Kafka的 “空中接力” 概念(这个我们后面学习Kafka的时候会详细的说明),所谓 “空中接力” 就是指数据不一定要落地,RocketMQ提供了同步/异步双写、同步/异步复制的特性。在真正的生产环境中应该选择符合自己业务的配置。下面针对于RocketMQ的高性能及其瓶颈在这里加以说明:

  • 架构思考:

    • RocketMQ目前本人在公司内部实际生产环境采用8M-8S的集群架构(8主8从)硬件单点Master为32C,96G内存,500G的SSD

    • 其主要瓶颈最终会落在IOPS上面,当高峰期来临的时候,磁盘读写能力是主要的性能瓶颈,每秒收发消息IOPS达到10W+ 消息,这也是公司内部主要的可靠性消息中间件。

    • 在很多时候,我们的业务会有一些非核心的消息投递,后续会进行消息中间件的业务拆分,把不重要的消息(可以允许消息丢失、非可靠性投递的消息)采用Kafka的异步发送机制,借助Kafka强大的吞吐量和消息堆积能力来做业务的分流(当然RocketMQ的性能也足够好)。

    • 为什么瓶颈在IOPS? 根本原因还是因为云环境导致的问题,云环境的SSD物理存储显然和自建机房SSD会有不小的差距,这一点我们无论是从数据库的磁盘性能、还是搜索服务(ElasticSearch)的磁盘性能,都能给出准确的瓶颈点,单机IOPS达到1万左右就是云存储SSD的性能瓶颈,这个也解释了 “木桶短板原理” 的效应,在真正的生产中,CPU的工作主要在等待IO操作,高并发下 CPU资源接近极限,但是IOPS还是达不到我们想要的效果。

本节知识点回顾

Hi,小伙伴们,本节课我们通过简要的图文学习,带大家快速的过了一下RocketMQ,那么小伙伴们记住一定要在脑海里建立知识的结构体系,并串联起来!无论是现在,还是说未来,本神都希望小伙伴要按照下面的步骤进行回忆和复习:

  1. RocketMQ的前世今生 ?

  2. RocketMQ的专业术语?

  3. RocketMQ源码包的组成?

  4. RocketMQ的集群架构模型

  5. RocketMQ在真正生产环境中面临的瓶颈点以及解决方案

补充课外资料

为了方便爱学习的小伙伴们,本神特意加餐一波,提供RocketMQ的官方文档和相关软件包,以及对应的代码示例,辅助小伙伴们更好掌握RocketMQ。另外本神还在慕课网上的一门课程详细的讲解了RocketMQ的各个知识点以及实战应用,有想深入学习的小伙伴可以学习共勉。

  1. 官方文档

  2. RocketMQ4.3.0版本(Linux平台)

  3. RocketMQ代码示例

  4. 慕课网实战课程《RocketMQ核心技术精讲与高并发抗压实战》传送门:RocketMQ核心技术精讲与高并发抗压实战-慕课网

· 1-8 Kafka介绍与高性能原因分析(11:15)

· 1-9 Kafka高性能核心pageCache与zeroCopy原理解析(12:06)

· 1-10 Kafka集群模型讲解(02:16)

· 1-11 本章总结

Hi,ALL!

善于总结、善于思考,你才会进步!

第2章 RabbitMQ进阶与实战

· 2-1 RabbitMQ学习指南(02:20)

· 2-2 初识RabbitMQ核心概念-1(19:52)

· 2-3 初识RabbitMQ核心概念-2(12:03)

· 2-4 RabbitMQ环境搭建与控制台详解(52:18)

三个软件包

参照急速入门文档。

急速入门,在这里我们使用RabbitMQ 3.6.5 版本进行操作:

1. 首先在Linux上进行一些软件的准备工作,yum下来一些基础的软件包

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

配置好主机名称:/etc/hosts /etc/hostname

2. 下载RabbitMQ所需软件包(本神在这里使用的是 RabbitMQ3.6.5 稳定版本)

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

3. 安装服务命令

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

三台服务器分别为:www.itjim.cn itjim2 itjim3

4. 修改用户登录与连接心跳检测,注意修改

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

修改点1:loopback_users 中的 <<"guest">>,只保留guest (用于用户登录)

修改点2:heartbeat 为10(用于心跳连接)

5. 安装管理插件

5.1 首先启动服务(后面 | 包含了停止、查看状态以及重启的命令)

/etc/init.d/rabbitmq-server start | stop | status | restart

//如果失败看看是不是端口没有开启,开启端口重启服务器

5.2 查看服务有没有启动: lsof -i:5672 (5672是Rabbit的默认端口)

rabbitmq-plugins enable rabbitmq_management

5.3 可查看管理端口有没有启动:

lsof -i:15672 或者 netstat -tnlp | grep 15672

6. 一切OK 我们访问地址,输入用户名密码均为 guest :

http://你的ip地址:15672/

7. 如果一切顺利,那么到此为止,我们的环境已经安装完啦

- 1

8.问题总结

1.starting rabbitmq-server (via systemctl): Job for rabbitmq-server.service failed because the control process exited with error code. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.

1.首先运行systemctl status rabbitmq-server.service(查看当前rabbitmq-server.service的服务状态)和journalctl -xe从结尾看日志

修改了hostname和hosts后面记得保存source还有就是重启服务器。

· 2-5 RabbitMQ环境搭建与控制台详解-2(20:43)

上面下载是升级的时候备份数据

添加队列

· 2-6 RabbitMQ急速入门HelloWorld(24:23)

看api项目helloworld

· 2-7 Rabbitmq核心API-Exchange之Direct(16:03)

看api中exchange包中direct

需要先启动接收信息,然后在发送信息。

· 2-8 Rabbitmq核心API-Exchange之Topic(16:23)

看api中exchange包中topic,模糊匹配。多对多,一种消息匹配一种规则。

如果收到三条信息,说明没有解绑,下图解绑掉users.#就可以。

· 2-9 Rabbitmq核心API-Exchange之Fanout(06:51)

启动接收端,在启动发送端,发现一人五条消息。

· 2-10 Rabbitmq核心API-其他关键概念讲解(05:21)

· 2-11 Rabbitmq高级特性-生产端可靠性投递与消费端幂等性(22:34)

· 2-12 Rabbitmq高级特性-生产端特性讲解_确认机制和返回机制(13:49)

· 2-13 Rabbitmq高级特性-消费端特性讲解_流控服务和ACK重回队列(18:49)

· 2-14 Rabbitmq高级特性-TTL消息与死信队列详解(20:06)

· 2-15 Rabbitmq集群搭建-镜像队列集群环境搭建实操(23:12)

· 2-16 RabbitMQ与SpringBoot整合_生产端-1(17:25)

需要return的话就需要mandatory为truef

确认消息,返回消息,如果return消息那么需要设置mandatory设置为true。

https://zhuanlan.zhihu.com/p/70200202

在新版本的 springboot rabbitmq 中 spring.rabbitmq.publisher-confirms 已经失效。需要使用 publisher-confirm-type 替代。

publisher-confirm-type 新版发布确认属性有三种确认类型:

  • NONE值是禁用发布确认模式,是默认值

  • CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例

  • SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

/**

* 设置生产者消息publish-confirm回调函数

*/

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if(!ack){

LoggerUtil.error(RabbitConfig.class, StringUtils.join("publishConfirm消息发送到交换器被退回,Id:", correlationData.getId(), ";退回原因是:", cause));

} else {

LoggerUtil.info(RabbitConfig.class, "发送消息到交换器成功,MessageId:"+correlationData.getId());

}

});

MANUAL:手动签收 最小线程数1,最大线程数5。

  1. spring:
  2. rabbitmq:
  3. virtual-host:/
  4. connection-timeout: 15000
  5. publisher-confirm-type: correlated
  6. publisher-returns: true #注意和mandatory一起使用
  7. template:
  8. mandatory: true
  9. jackson:
  10. date-format: yyyy-MM-dd HH:mm:ss
  11. time-zone: GMT+8
  12. default-property-inclusion: NON_NULL

注意改服务器地址

· 2-17 RabbitMQ与SpringBoot整合_生产端-2(13:16)

  1. package com.bfxy.rabbit.producer.component;
  2. import java.util.Map;
  3. import java.util.UUID;
  4. import org.springframework.amqp.AmqpException;
  5. import org.springframework.amqp.core.MessagePostProcessor;
  6. import org.springframework.amqp.rabbit.connection.CorrelationData;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.messaging.Message;
  11. import org.springframework.messaging.MessageHeaders;
  12. import org.springframework.messaging.support.MessageBuilder;
  13. import org.springframework.stereotype.Component;
  14. @Component
  15. public class RabbitSender {
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. /**
  19. * 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
  20. */
  21. final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
  22. /**
  23. * @param correlationData 作为一个唯一的标识
  24. * @param ack broker 是否落盘成功
  25. * @param cause 失败的一些异常信息
  26. */
  27. @Override
  28. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  29. System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId());
  30. }
  31. };
  32. /**
  33. * 对外发送消息的方法
  34. * @param message 具体的消息内容
  35. * @param properties 额外的附加属性
  36. * @throws Exception
  37. */
  38. public void send(Object message, Map<String, Object> properties) throws Exception {
  39. MessageHeaders mhs = new MessageHeaders(properties);
  40. Message<?> msg = MessageBuilder.createMessage(message, mhs);
  41. rabbitTemplate.setConfirmCallback(confirmCallback);
  42. //指定业务唯一的iD
  43. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  44. //发送消息要做什么
  45. MessagePostProcessor mpp = new MessagePostProcessor() {
  46. @Override
  47. public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
  48. throws AmqpException {
  49. System.err.println("---> post to do: " + message);
  50. return message;
  51. }
  52. };
  53. rabbitTemplate.convertAndSend("exchange-1",
  54. "springboot.rabbit",
  55. msg, mpp, correlationData);
  56. }
  57. }

rabbitTemplate.send():是一条一条发送。

convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。

convertSendAndReceive(…):可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。

· 2-18 RabbitMQ与SpringBoot整合_消费端-1(14:56)

  1. rabbitmq:
  2. listener:
  3. simple:
  4. acknowledge-mode: manual
  5. concurrency: 5 #线程数为5
  6. prefetch: 1 #批量消费,一条一条消费
  7. max-concurrency: 10 #最大线程数为10

· 2-19 RabbitMQ与SpringBoot整合_消费端-2(13:01)

先启动消费端在启动客户端,然后可以看到消息发过去。

先启动客户端再注释掉

会发现能收到消息。但是下面圈是1。停止消费端,不注释最后一行代码,运行消费端发现还是可以接收消息,而且没有1了。

第3章 RabbitMQ基础组件封装实战

· 3-1 RabbitMQ基础组件整体功能概述(03:27)

· 3-2 RabbitMQ基础组件模块划分(09:05)

就是分包

· 3-3 RabbitMQ基础组件API封装-1(23:06)

创建这五个类讲解

· 3-4 RabbitMQ基础组件API封装-2(04:29)

· 3-5 自动装配与架构接口定义(10:35)

· 3-6 发送迅速异步消息(18:49)

· 3-7 RabbitTemplate池化封装(25:30)

不同的topic,不同的rabbitTemplate

就讲一个类

· 3-8 序列化与反序列化转换封装(22:52)

· 3-9 确认消息实现(03:28)

· 3-10 从架构的视角分析可靠性消息投递(14:33)

· 3-11 可靠性投递落地-集成数据源(21:40)

· 3-12 可靠性投递落地-可靠性消息业务实现落地(17:29)

service主要是用来存储

· 3-13 可靠性投递落地-ESJOB定时任务讲解-1(29:58)

高并发不会用zk做分布式锁,会用redis

· 3-14 补充:Zookeeper简介与环境搭建

接下来本神带着小伙伴们简单了解一下Apache-Zookeeper,并进行Zookeeper集群的环境搭建;

  • Zookeeper基础知识、体系结构、数据模型

    • Zookeeper是一个类似linux、hdfs的树形文件结构,zookeeper可以用来保证数据在(Zookeeper)集群之间的数据的事务性一致性,zookeeper也是我们常说的CAP理论中的CP(强一致性);

    • Zookeeper有一个概念叫watch(也称之为事件),是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client端,即watcher实例对象(用于改变节点的变化而做出相应的行为)

    • 关于Zookeeper其他相关内容,小伙伴可以参考一些相关的资料,在这里我们仅仅使用Zookeeper做注册中心;

  • Zookeeper有三个角色:Leader,Follower,Observer

    • Leader:数据总控节点,用于接收客户端连接请求,分发给所有的Follower节点后,各个Follower节点进行更新数据操作并返回给Leader节点,如果满足半数以上(所以Zookeeper集群一般是奇数个节点)更新成功则此次操作成功;

    • Follower:相当于跟随者的角色,Zookeeper的Leader宕机(挂掉)时,所有的Follower角色内部会产生选举机制,选举出新的Leader用于总控;

    • Observer:顾名思义,就是我们的客户端,用于观察Zookeeper集群的数据发送变化,如果产生变化则zookeeper会主动推送watch事件给Observer(客户端),用于对数据变化的后续处理;当然Observer(客户端)也可以发送数据变更请求;

  • Zookeeper应用场景:

    • 统一命名服务(Name Service)

    • 配置管理(Configuration Management)

    • 集群管理(Group Membership)

    • 共享锁(Locks)

    • 队列管理

  • Zookeeper集群环境搭建与配置:

1. 准备工作:

## 准备3个节点,要求配置好主机名称,服务器之间系统时间保持一致

## 注意 /etc/hostname 和 /etc/hosts 配置主机名称(在这个里我准备bhz125,bhz126,bhz127三节点)

## 特别注意 以下操作3个节点要同时进行操作哦!

2. 上传zk到三台服务器节点

## 注意我这里解压到/usr/local下

2.1 进行解压: tar zookeeper-3.4.6.tar.gz

2.2 重命名: mv zookeeper-3.4.6 zookeeper

2.3 修改环境变量: vim /etc/profile

## 这里要添加zookeeper的全局变量

export ZOOKEEPER_HOME=/usr/local/zookeeper

export PATH=.:$ZOOKEEPER_HOME/bin

2.4 刷新环境变量: source /etc/profile

2.5 到zookeeper下修改配置文件:

2.5.1 首先到指定目录: cd /usr/local/zookeeper/conf

2.5.2 然后复制zoo_sample.cfg文件,复制后为zoo.cfg: mv zoo_sample.cfg zoo.cfg

2.5.3 然后修改两处地方, 最后保存退出:

(1) 修改数据的dir

dataDir=/usr/local/zookeeper/data

(2) 修改集群地址

server.0=bhz125:2888:3888

server.1=bhz126:2888:3888

server.2=bhz127:2888:3888

2.6 增加服务器标识配置,需要2步骤,第一是创建文件夹和文件,第二是添加配置内容:

(1) 创建文件夹: mkdir /usr/local/zookeeper/data

(2) 创建文件myid 路径应该创建在/usr/local/zookeeper/data下面,如下:

vim /usr/local/zookeeper/data/myid

注意这里每一台服务器的myid文件内容不同,分别修改里面的值为0,1,2;与我们之前的zoo.cfg配置文 件里:server.0,server.1,server.2 顺序相对应,然后保存退出;

2.7 到此为止,Zookeeper集群环境大功告成!启动zookeeper命令

启动路径:/usr/local/zookeeper/bin(也可在任意目录,因为配置了环境变量)

执行命令:zkServer.sh start (注意这里3台机器都要进行启动,启动之后可以查看状态)

  • 查看状态:zkServer.sh status (在三个节点上检验zk的mode, 会看到一个leader和俩个follower)

Zookeeper客户端操作:

zkCli.sh 进入zookeeper客户端

根据提示命令进行操作:

查找:ls / ls /zookeeper

创建并赋值: create /imooc zookeeper

获取: get /imooc

设值: set /imooc zookeeper1314

PS1: 任意节点都可以看到zookeeper集群的数据一致性

PS2: 创建节点有俩种类型:短暂(ephemeral) 持久(persistent), 这些小伙伴们可以查找相关资料,我们这里作为入门不做过多赘述!

Zookeeper核心配置详解:(zoo.cfg配置文件,扩展内容)

(1)tickTime:基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每隔 tickTime时间就会发送一个心跳。

(2)initLimit:这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数,当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。

(3)syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒

(4)dataDir:存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

(5)clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

(6)至于最后的配置项:server.A = B:C:D:

A表示这个是第几号服务器,

B 是这个服务器的 ip 地址;

C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;

D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader

优秀博文推荐:zookeeper 集群搭建 - YSOcean - 博客园

· 3-15 可靠性投递落地-ESJOB定时任务讲解-2(13:13)

· 3-16 可靠性投递落地-ESJOB定时任务讲解-3(10:32)

· 3-17 定时任务通用组件封装-1(22:02)

· 3-18 定时任务通用组件封装-2(10:39)

· 3-19 定时任务通用组件封装-3(19:23)

· 3-20 定时任务通用组件封装-4(15:45)

· 3-21 定时任务通用组件封装-5(14:38)

· 3-22 可靠性消息重试实现集成定时任务组件-1(16:37)

· 3-23 可靠性消息重试实现集成定时任务组件-2(22:42)

· 3-24 可靠性消息最终演示(10:22)

· 3-25 批量消息发送封装(09:43)

· 3-26 延迟消息应用与封装(15:33)

· 3-27 总结与复习

· 3-28 本周作业练习

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/631240
推荐阅读
相关标签
  

闽ICP备14008679号