当前位置:   article > 正文

RabbitMQ使用入门_rabbitmq 管理端口

rabbitmq 管理端口

 1.环境准备

1.1 docker中配置

下载镜像:docker pull rabbitmq:management

创建实例并启动:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -p 25672:25672 rabbitmq:management

 The container name "/rabbitmq" is already in use by container "d1932f0cfb1b355a0df768e86a5c175eb02810f6d0ab42ff0945ba5bccbc144c"

如果报错上面问题,运行命令docker ps -a 检查一下是否存在容器中,可以运行命令docker rm 加容器id删除后重新运行启动命令

注: 4369 -- erlang发现口 5672 --client端通信口

15672 -- 管理界面ui端口 25672 -- server间内部通信口

1.2 或使用安装包安装并配置

安装Erlang、Socat、RabbitMQ(提前准备好安装包到指定解压目录)

执行安装命令:

rpm -ivh erlang-21.3.8.9-1.el7.x86_64.rpm

rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm 

rpm -ivh  rabbitmq-server-3.8.1-1.el7.noarch.rpm
#如果rabbitmq安装报错,在线安装socat
yum install -y socat

安装成功后rabbitmq命令存放在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.1/sbin

启用管理插件

rabbitmq-plugins enable rabbitmq_management

RabbitMQ启停命令

# 第一次启动时会比较慢
systemctl start rabbitmq-server.service

systemctl status rabbitmq-server.service

systemctl restart rabbitmq-server.service

systemctl stop rabbitmq-server.service 

查看进程

ps -ef | grep rabbitmq

rabbitmq启动时会监听三个端口号:

5672: 其他程序和rabbitmq交互的端口[比如:java程序需要使用rabbitmq,就和此端口建立连接]

15672:rabbitmq的后台管理系统的端口号[rabbitmq的客户端]

25672:集群环境搭建的端口号

测试

在web浏览器中输入地址:http://虚拟机ip:15672/

输入默认账号: guest : guest,默认不允许远程连接

解决:

增加管理员账号:

adminrabbitmqctl add_user admin admin

 

 

 

如果新增慢,然后报错新增失败,解决方案: //查询当前主机的名称
cat /etc/hostname
//将主机名称和本机的ip 127.0.0.1绑定映射存到hosts文件中
vim /etc/hosts
127.0.0.1    查询到的主机名称
//然后再次添加用户
rabbitmqctl add_user admin admin

给账号分配角色: # 角色:
# 1、超级管理员(administrator):所有权限
# 2、监控者(monitoring): 登录控制台、查看mq信息权限
# 3、策略制定者(policymaker):登录控制台、管理policy权限
# 4、普通管理者(management):只能登录控制台
rabbitmqctl set_user_tags admin administrator

修改角色密码: rabbitmqctl change_password admin 123456

查看用户列表:rabbitmqctl list_users

测试,使用新建账号登录

overview:概览

connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况

channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

Exchanges:交换机,用来实现消息的路由

Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

端口:

5672: rabbitMq的编程语言客户端连接端口

15672:rabbitMq管理界面端口

25672:rabbitMq集群的端口

卸载过程步骤

1、停止rabbitmq服务

systemctl stop rabbitmq-server

2、卸载erlang 查看erlang安装的相关列表

yum list|grep erlang

卸载erlang所有内容

yum -y remove erlang-*

删除erlang目录

rm -rf /usr/lib64/erlang

3、卸载rabbitmq 查看rabbitmq安装的相关列表

yum list|grep rabbitmq

卸载rabbitmq所有内容

yum -y remove rabbitmq-server.noarch

查找并删除rabbitmq相关目录

find / -name rabbit*

依次删除对应目录:rm -rf 路径 例如:

rm -rvf /var/lib/rabbitmq
rm -rvf /usr/lib/rabbitmq
rm -rvf /var/log/rabbitmq

2.项目模块中

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  1. spring:
  2. rabbitmq: #mq配置
  3. host: 192.168.36.131
  4. port: 5672
  5. username: wangbaomin
  6. password: abcd1234
  7. virtual-host: /srb
  8. # 生产者确认回调:确保消息可以到达交换机/队列
  9. publisher-confirm-type: simple
  10. publisher-returns: true

2.1 业务类新建配置类创建生产者回调 (引入同样的依赖)

  1. @Configuration
  2. @Slf4j
  3. public class MqConfig {
  4. //生产者回调
  5. @Resource
  6. private RabbitTemplate rabbitTemplate;
  7. @PostConstruct
  8. public void init(){
  9. //setReturnCallback点进去,ReturnCallback点进去,查看实现类中有且仅有一个抽象方法,称为函数式接口,隐式转换为lambda表达式让代码看起来比较简洁
  10. //消息未到达队列的回调
  11. rabbitTemplate.setReturnCallback((Message message, int replyCode,
  12. String replyText, String exchange, String routingKey)->{
  13. //未到达队列可以记录日志分析错误原因
  14. // mq的消息可以是对象、集合、字符串:可以将字节数组转为base64的字符串传递
  15. log.error("消息未到达队列: replyCode={} ,replyText={} , " +
  16. "exchange={} , routingKey={}, message={}",replyCode,replyText,exchange,routingKey, new String(Base64.getEncoder().encode(message.getBody())));
  17. });
  18. //消息是否到达交换机的回调
  19. rabbitTemplate.setConfirmCallback((@NonNull CorrelationData correlationData, boolean ack, @Nullable String cause)->{
  20. if(!ack){
  21. //消息未到达交换机
  22. log.error("消息未到达交换机:{}",cause);
  23. }
  24. });
  25. }
  26. //配置交换机
  27. @Bean
  28. public Exchange smsExchange(){
  29. return ExchangeBuilder.topicExchange(SrbConst.SMS_EXCHANGE)//交换机的名字,配合常量使用
  30. .ignoreDeclarationExceptions()
  31. .durable(true)
  32. .build();
  33. }
  34. @Bean
  35. public MessageConverter messageConverter(){
  36. //json字符串转换器
  37. return new Jackson2JsonMessageConverter();
  38. }
  39. }

2.2 配置模块中配置交换机队列名称的常量类

  1. public interface SrbConst {
  2. String SMS_EXCHANGE = "sms.exchange";
  3. String SMS_QUEUE = "sms.queue";
  4. String SMS_ROUTING_KEY = "sms.send";
  5. String SMS_DEAD_EXCHANGE = "sms.dead.exchange";
  6. String SMS_DEAD_QUEUE = "sms.dead.queue";
  7. String SMS_DEAD_ROUTING_KEY = "sms.dead.msg";
  8. }
  9. @Data
  10. public class SmsDto {
  11. @ApiModelProperty("手机号码")
  12. private String mobile;
  13. @ApiModelProperty("短信类型:1注册、2登录、3充值成功、4提款成功")
  14. private Integer type;
  15. @ApiModelProperty("短信模板需要的参数列表")
  16. private List<String> params;
  17. }

2.3 短信模块中配值交换机、队列、死信交换机、死信队列、json字符串转换器

  1. @Configuration
  2. public class MqConfig {
  3. //交换机,队列,死信交换机,死信队列的配置类
  4. @Bean
  5. public Exchange smsExchange(){
  6. return ExchangeBuilder.topicExchange(SrbConst.SMS_EXCHANGE)
  7. .ignoreDeclarationExceptions()
  8. .build();
  9. }
  10. @Bean
  11. public Queue smsQueue(){
  12. return QueueBuilder.durable(SrbConst.SMS_QUEUE)
  13. .deadLetterExchange(SrbConst.SMS_DEAD_EXCHANGE)
  14. .deadLetterRoutingKey(SrbConst.SMS_DEAD_ROUTING_KEY)
  15. .build();
  16. }
  17. @Bean
  18. public Binding smsBinding(Exchange smsExchange, Queue smsQueue){
  19. return BindingBuilder.bind(smsQueue).to(smsExchange)
  20. .with(SrbConst.SMS_ROUTING_KEY)
  21. .noargs();
  22. }
  23. //死信交换机 死信队列
  24. @Bean
  25. public Exchange deadExchange(){
  26. return ExchangeBuilder.topicExchange(SrbConst.SMS_DEAD_EXCHANGE)
  27. .ignoreDeclarationExceptions()
  28. .build();
  29. }
  30. @Bean
  31. public Queue deadQueue(){
  32. return QueueBuilder.durable(SrbConst.SMS_DEAD_QUEUE)
  33. .build();
  34. }
  35. @Bean
  36. public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
  37. return BindingBuilder.bind(deadQueue).to(deadExchange)
  38. .with(SrbConst.SMS_ROUTING_KEY)
  39. .noargs();
  40. }
  41. @Bean
  42. public MessageConverter messageConverter(){
  43. //json字符串转换器
  44. return new Jackson2JsonMessageConverter();
  45. }
  46. }

2.4消息模块中创建消息消费队列监听器

  1. @Component
  2. public class MqListener {
  3. @Resource
  4. private SmsService smsService;
  5. @RabbitListener(queues = {SrbConst.SMS_QUEUE})
  6. public void sMsSendListener(Message message , Channel channel, SmsDto smsDto) throws IOException {
  7. //发送信息
  8. //
  9. try {
  10. smsService.sendSrbSms(smsDto);
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  12. } catch (Exception e) {
  13. if(message.getMessageProperties().isRedelivered()){
  14. //如果消息是重新投递 再次消费失败 丢弃消息到绑定的死信队列中
  15. channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
  16. }else{
  17. channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
  18. }
  19. }
  20. }
  21. }

2.5定义业务层方法实现发送消息

  1. @Override
  2. public void sendSrbSms(SmsDto smsDto) {
  3. String mobile = smsDto.getMobile();
  4. Integer type = smsDto.getType();
  5. List<String> params = smsDto.getParams();
  6. String s = params.get(0);
  7. String message = null;
  8. switch (type){
  9. case 1:
  10. message="zc";
  11. break;
  12. case 2:
  13. message="dl";
  14. break;
  15. case 3:
  16. message="cz";
  17. break;
  18. case 4:
  19. message="tx";
  20. break;
  21. default:
  22. message="qt";
  23. break;
  24. }
  25. sendMessage(message+s,mobile);
  26. }
  27. public void sendMessage(String sixBitRandom ,String phoneNum){
  28. String host = SmsProperties.HOST;
  29. String path = SmsProperties.PATH;
  30. String method = SmsProperties.METHOD;
  31. String appcode = SmsProperties.APPCODE;
  32. Map<String, String> headers = new HashMap<String, String>();
  33. //最后在header中的格式(中间是英文空格)为Authorization:APPCODE 83359fd73fe94948385f570e3c139105
  34. headers.put("Authorization", "APPCODE " + appcode);
  35. //根据API的要求,定义相对应的Content-Type
  36. headers.put("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
  37. Map<String, String> querys = new HashMap<String, String>();
  38. Map<String, String> bodys = new HashMap<String, String>();
  39. String codeMess = "code:"+ sixBitRandom;
  40. bodys.put("content", codeMess );
  41. bodys.put("phone_number", phoneNum);
  42. bodys.put("template_id", "CST_ptdie100");
  43. try {
  44. /**
  45. * 重要提示如下:
  46. * HttpUtils请从
  47. * https://github.com/aliyun/api-gateway-demo-sign-java/blob/master/src/main/java/com/aliyun/api/gateway/demo/util/HttpUtils.java
  48. * 下载
  49. *
  50. * 相应的依赖请参照
  51. * https://github.com/aliyun/api-gateway-demo-sign-java/blob/master/pom.xml
  52. */
  53. HttpResponse response = HttpUtils.doPost(host, path, method, headers, querys, bodys);
  54. System.out.println(response.toString());
  55. //获取response的body
  56. //System.out.println(EntityUtils.toString(response.getEntity()));
  57. String responseMessage = EntityUtils.toString(response.getEntity());
  58. System.out.println(responseMessage);//{"status":"OK","request_id":"TIDb9678a03bb664c648e13dcc0ea4b034a"}
  59. Map map = JSON.parseObject(responseMessage, Map.class);
  60. String status = map.get("status").toString();
  61. System.out.println(status);
  62. if ("OK".equals(status)){
  63. System.out.println("短信发送成功!");
  64. }else {
  65. System.out.println("短信发送失败!");
  66. throw new SrbException(ResultEnum.ALIYUN_SMS_ERROR);
  67. }
  68. } catch (Exception e) {
  69. e.printStackTrace();
  70. throw new SrbException(ResultEnum.ALIYUN_SMS_ERROR);
  71. }
  72. }
  73. }

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号