赞
踩
下载镜像:
docker pull rabbitmq:management
创建实例并启动:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq:managementThe container name "/rabbitmq" is already in use by container "d1932f0cfb1b355a0df768e86a5c175eb02810f6d0ab42ff0945ba5bccbc144c"
如果报错上面问题,运行命令docker ps -a 检查一下是否存在容器中,可以运行命令docker rm 加容器id删除后重新运行启动命令
注: 4369 -- erlang发现口 5672 --client端通信口
15672 -- 管理界面ui端口 25672 -- server间内部通信口
安装Erlang、Socat、RabbitMQ(提前准备好安装包到指定解压目录)
执行安装命令:
rpm -ivh erlang-21.3.8.9-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.1-1.el7.noarch.rpm
#如果rabbitmq安装报错,在线安装socat
yum install -y socat
安装成功后rabbitmq命令存放在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.1/sbin
启用管理插件
rabbitmq-plugins enable rabbitmq_management
RabbitMQ启停命令
# 第一次启动时会比较慢
systemctl start rabbitmq-server.servicesystemctl status rabbitmq-server.service
systemctl restart rabbitmq-server.service
systemctl stop rabbitmq-server.service
查看进程
ps -ef | grep rabbitmq
rabbitmq启动时会监听三个端口号:
5672: 其他程序和rabbitmq交互的端口[比如:java程序需要使用rabbitmq,就和此端口建立连接]
15672:rabbitmq的后台管理系统的端口号[rabbitmq的客户端]
25672:集群环境搭建的端口号
测试
在web浏览器中输入地址:http://虚拟机ip:15672/
输入默认账号: guest : guest,默认不允许远程连接
解决:
增加管理员账号:
adminrabbitmqctl add_user admin admin
如果新增慢,然后报错新增失败,解决方案: //查询当前主机的名称
cat /etc/hostname
//将主机名称和本机的ip 127.0.0.1绑定映射存到hosts文件中
vim /etc/hosts
127.0.0.1 查询到的主机名称
//然后再次添加用户
rabbitmqctl add_user admin admin
给账号分配角色: # 角色:
# 1、超级管理员(administrator):所有权限
# 2、监控者(monitoring): 登录控制台、查看mq信息权限
# 3、策略制定者(policymaker):登录控制台、管理policy权限
# 4、普通管理者(management):只能登录控制台
rabbitmqctl set_user_tags admin administrator
修改角色密码: rabbitmqctl change_password admin 123456
查看用户列表:rabbitmqctl list_users
测试,使用新建账号登录
overview:概览
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
端口:
5672: rabbitMq的编程语言客户端连接端口
15672:rabbitMq管理界面端口
25672:rabbitMq集群的端口
卸载过程步骤
1、停止rabbitmq服务
systemctl stop rabbitmq-server2、卸载erlang 查看erlang安装的相关列表
yum list|grep erlang卸载erlang所有内容
yum -y remove erlang-*删除erlang目录
rm -rf /usr/lib64/erlang3、卸载rabbitmq 查看rabbitmq安装的相关列表
yum list|grep rabbitmq卸载rabbitmq所有内容
yum -y remove rabbitmq-server.noarch查找并删除rabbitmq相关目录
find / -name rabbit*依次删除对应目录:rm -rf 路径 例如:
rm -rvf /var/lib/rabbitmq rm -rvf /usr/lib/rabbitmq rm -rvf /var/log/rabbitmq
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- spring:
- rabbitmq: #mq配置
- host: 192.168.36.131
- port: 5672
- username: wangbaomin
- password: abcd1234
- virtual-host: /srb
- # 生产者确认回调:确保消息可以到达交换机/队列
- publisher-confirm-type: simple
- publisher-returns: true
- @Configuration
- @Slf4j
- public class MqConfig {
- //生产者回调
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init(){
- //setReturnCallback点进去,ReturnCallback点进去,查看实现类中有且仅有一个抽象方法,称为函数式接口,隐式转换为lambda表达式让代码看起来比较简洁
-
- //消息未到达队列的回调
- rabbitTemplate.setReturnCallback((Message message, int replyCode,
- String replyText, String exchange, String routingKey)->{
- //未到达队列可以记录日志分析错误原因
- // mq的消息可以是对象、集合、字符串:可以将字节数组转为base64的字符串传递
- log.error("消息未到达队列: replyCode={} ,replyText={} , " +
- "exchange={} , routingKey={}, message={}",replyCode,replyText,exchange,routingKey, new String(Base64.getEncoder().encode(message.getBody())));
-
- });
- //消息是否到达交换机的回调
- rabbitTemplate.setConfirmCallback((@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause)->{
-
- if(!ack){
- //消息未到达交换机
- log.error("消息未到达交换机:{}",cause);
- }
- });
-
- }
-
- //配置交换机
- @Bean
- public Exchange smsExchange(){
- return ExchangeBuilder.topicExchange(SrbConst.SMS_EXCHANGE)//交换机的名字,配合常量使用
- .ignoreDeclarationExceptions()
- .durable(true)
- .build();
- }
-
- @Bean
- public MessageConverter messageConverter(){
- //json字符串转换器
- return new Jackson2JsonMessageConverter();
- }
- }
- public interface SrbConst {
- String SMS_EXCHANGE = "sms.exchange";
- String SMS_QUEUE = "sms.queue";
- String SMS_ROUTING_KEY = "sms.send";
- String SMS_DEAD_EXCHANGE = "sms.dead.exchange";
- String SMS_DEAD_QUEUE = "sms.dead.queue";
- String SMS_DEAD_ROUTING_KEY = "sms.dead.msg";
- }
-
-
- @Data
- public class SmsDto {
- @ApiModelProperty("手机号码")
- private String mobile;
- @ApiModelProperty("短信类型:1注册、2登录、3充值成功、4提款成功")
- private Integer type;
- @ApiModelProperty("短信模板需要的参数列表")
- private List<String> params;
-
- }
- @Configuration
- public class MqConfig {
-
- //交换机,队列,死信交换机,死信队列的配置类
-
- @Bean
- public Exchange smsExchange(){
- return ExchangeBuilder.topicExchange(SrbConst.SMS_EXCHANGE)
- .ignoreDeclarationExceptions()
- .build();
- }
-
- @Bean
- public Queue smsQueue(){
- return QueueBuilder.durable(SrbConst.SMS_QUEUE)
- .deadLetterExchange(SrbConst.SMS_DEAD_EXCHANGE)
- .deadLetterRoutingKey(SrbConst.SMS_DEAD_ROUTING_KEY)
- .build();
- }
-
- @Bean
- public Binding smsBinding(Exchange smsExchange, Queue smsQueue){
- return BindingBuilder.bind(smsQueue).to(smsExchange)
- .with(SrbConst.SMS_ROUTING_KEY)
- .noargs();
- }
-
-
- //死信交换机 死信队列
- @Bean
- public Exchange deadExchange(){
- return ExchangeBuilder.topicExchange(SrbConst.SMS_DEAD_EXCHANGE)
- .ignoreDeclarationExceptions()
- .build();
- }
-
- @Bean
- public Queue deadQueue(){
- return QueueBuilder.durable(SrbConst.SMS_DEAD_QUEUE)
- .build();
-
- }
-
- @Bean
- public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
- return BindingBuilder.bind(deadQueue).to(deadExchange)
- .with(SrbConst.SMS_ROUTING_KEY)
- .noargs();
- }
-
- @Bean
- public MessageConverter messageConverter(){
- //json字符串转换器
- return new Jackson2JsonMessageConverter();
- }
- }
- @Component
- public class MqListener {
- @Resource
- private SmsService smsService;
-
- @RabbitListener(queues = {SrbConst.SMS_QUEUE})
- public void sMsSendListener(Message message , Channel channel, SmsDto smsDto) throws IOException {
-
- //发送信息
- //
- try {
- smsService.sendSrbSms(smsDto);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- } catch (Exception e) {
- if(message.getMessageProperties().isRedelivered()){
- //如果消息是重新投递 再次消费失败 丢弃消息到绑定的死信队列中
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
- }else{
- channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
- }
- }
-
- }
- }
- @Override
- public void sendSrbSms(SmsDto smsDto) {
- String mobile = smsDto.getMobile();
- Integer type = smsDto.getType();
- List<String> params = smsDto.getParams();
- String s = params.get(0);
- String message = null;
- switch (type){
- case 1:
- message="zc";
- break;
- case 2:
- message="dl";
- break;
- case 3:
- message="cz";
- break;
- case 4:
- message="tx";
- break;
- default:
- message="qt";
- break;
- }
- sendMessage(message+s,mobile);
- }
-
-
-
- public void sendMessage(String sixBitRandom ,String phoneNum){
- String host = SmsProperties.HOST;
- String path = SmsProperties.PATH;
- String method = SmsProperties.METHOD;
- String appcode = SmsProperties.APPCODE;
- Map<String, String> headers = new HashMap<String, String>();
- //最后在header中的格式(中间是英文空格)为Authorization:APPCODE 83359fd73fe94948385f570e3c139105
- headers.put("Authorization", "APPCODE " + appcode);
- //根据API的要求,定义相对应的Content-Type
- headers.put("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
- Map<String, String> querys = new HashMap<String, String>();
- Map<String, String> bodys = new HashMap<String, String>();
- String codeMess = "code:"+ sixBitRandom;
- bodys.put("content", codeMess );
- bodys.put("phone_number", phoneNum);
- bodys.put("template_id", "CST_ptdie100");
-
- try {
- /**
- * 重要提示如下:
- * HttpUtils请从
- * https://github.com/aliyun/api-gateway-demo-sign-java/blob/master/src/main/java/com/aliyun/api/gateway/demo/util/HttpUtils.java
- * 下载
- *
- * 相应的依赖请参照
- * https://github.com/aliyun/api-gateway-demo-sign-java/blob/master/pom.xml
- */
- HttpResponse response = HttpUtils.doPost(host, path, method, headers, querys, bodys);
- System.out.println(response.toString());
- //获取response的body
- //System.out.println(EntityUtils.toString(response.getEntity()));
- String responseMessage = EntityUtils.toString(response.getEntity());
- System.out.println(responseMessage);//{"status":"OK","request_id":"TIDb9678a03bb664c648e13dcc0ea4b034a"}
- Map map = JSON.parseObject(responseMessage, Map.class);
- String status = map.get("status").toString();
- System.out.println(status);
- if ("OK".equals(status)){
- System.out.println("短信发送成功!");
- }else {
- System.out.println("短信发送失败!");
- throw new SrbException(ResultEnum.ALIYUN_SMS_ERROR);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new SrbException(ResultEnum.ALIYUN_SMS_ERROR);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。