当前位置:   article > 正文

多vhost下rabbitmq配置及切换_rabbitmq 配置多个vhost

rabbitmq 配置多个vhost
1.1 yml配置文件
spring:
  rabbitmq:
    # host: amqp-cn-9lb31jq7o019.cn-beijing.amqp-14.vpc.mq.amqp.aliyuncs.com
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # username: MjphbXFwLWNuLTlsYjMxanE3bzAxOTpMVEFJNXRTVFhoWVpnODRlWUg5aHJyRjI=
    # password: RTI5MzlFQ0FFQzM5Q0I3MkM0MUEzNDc1M0ZFMzU0MkIwNjRFQzg4MToxNjc1MzE4OTMwMTMx
    # virtual-host: /lk
    virtual-host: /
    first:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
      virtual-host: /xinaohealth
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
1.2 配置类(需手动创建交换机和队列等)
import com.laikang.cloud.community.server.common.constants.Constant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RabbitMqConfigNew {

    /**
     * first 连接工厂
     */
    @Bean(name = "firstConnectionFactory")
    public ConnectionFactory firstConnectionFactory(@Value("${spring.rabbitmq.first.host}") String host,
                                                    @Value("${spring.rabbitmq.first.port}") int port,
                                                    @Value("${spring.rabbitmq.first.username}") String username,
                                                    @Value("${spring.rabbitmq.first.password}") String password,
                                                    @Value("${spring.rabbitmq.first.virtual-host}") String vHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vHost);
        return connectionFactory;
    }

    /**
     * primary 连接工厂
     */
    @Bean(name = "primaryConnectionFactory")
    @Primary //必须声明主连接
    public ConnectionFactory secondConnectionFactory(@Value("${spring.rabbitmq.host}") String host,
                                                     @Value("${spring.rabbitmq.port}") int port,
                                                     @Value("${spring.rabbitmq.username}") String username,
                                                     @Value("${spring.rabbitmq.password}") String password,
                                                     @Value("${spring.rabbitmq.virtual-host}") String vHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vHost);
        return connectionFactory;
    }

    /**
     * primary 消费者
     */
    @Bean(name = "primaryListenerFactory")
    public SimpleRabbitListenerContainerFactory secondListenerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(listenerContainerFactory, connectionFactory);
        return listenerContainerFactory;
    }
  	 /**
     * first 消费者
     */
    @Bean(name = "firstListenerFactory")
    public SimpleRabbitListenerContainerFactory secondListenerFactory2(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(listenerContainerFactory, connectionFactory);
        return listenerContainerFactory;
    }

    /**
     *    first生产者的RabbitTemplate,发送消息时使用
     */
    @Bean(name = "firstRabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
  
  	/**
     *    first对应的rabbitAdmin
     *    不能使用spring自动装配的bean(它无法确定在哪个virtualhost创建交换机),必须手动创建交换机/队列/绑定
     */
    @Bean(name = "firstRabbitAdmin")
    public RabbitAdmin eventMqRabbitAdmin(
            @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        Queue queue= new Queue(Constant.USER_CHECK_QUEUE,true,false,false);
        DirectExchange directExchange= new DirectExchange(Constant.USER_CHECK_EXCHANG,true,false);
        Binding binding = BindingBuilder.bind(queue).to(directExchange).with(Constant.USER_CHECK_ROUTINGKEY);
        //先创建交换机队列,最后创建绑定
      	rabbitAdmin.declareQueue(queue); 
        rabbitAdmin.declareExchange(directExchange);
        rabbitAdmin.declareBinding(binding);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
		
  	/**
     *    primaryRabbitAdmin 
     */
    @Bean(name = "primaryRabbitAdmin")
    public RabbitAdmin primaryRabbitAdmin(
            @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        Queue queue= new Queue(Constant.COMMUNITY_USER_SYNC_QUEUE,true,false,false);
        DirectExchange directExchange= new DirectExchange(Constant.COMMUNITY_USER_SYNC_EXCHANGE,true,false);
        Binding binding = BindingBuilder.bind(queue).to(directExchange).with(Constant.COMMUNITY_USER_SYNC_ROUTINGKEY);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareExchange(directExchange);
        rabbitAdmin.declareBinding(binding);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125

注:多个virturalhost下不能使用如下方式自动创建交换机/队列(需手动创建)

@Configuration
public class RabbitMqConfig {
    @Resource
    RabbitAdmin firstRabbitAdmin;
    /**
     * 消息队列所绑定的交换机
     * @return
     */
    @Bean
    DirectExchange userSyncDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(Constant.COMMUNITY_USER_SYNC_EXCHANGE)
                .durable(true)
                .build();
    }
    /**
     * 消息队列
     */
    @Bean
    public Queue userSyncQueue() {
        return new Queue(Constant.COMMUNITY_USER_SYNC_QUEUE);
    }
    /**
     * 将队列绑定到交换机
     */
    @Bean
    Binding userSyncBinding(DirectExchange userSyncDirect, Queue userSyncQueue){
        return BindingBuilder
                .bind(userSyncQueue)
                .to(userSyncDirect)
                .with(Constant.COMMUNITY_USER_SYNC_ROUTINGKEY);
    }
    @Bean
    public Binding primaryBinding() {
        Binding binding = BindingBuilder.bind(userSyncQueue()).to(userSyncDirect()).with(Constant.COMMUNITY_USER_SYNC_ROUTINGKEY);
        binding.setAdminsThatShouldDeclare(firstRabbitAdmin);
        return binding;
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
1.3 生产者发送消息
@Component
public class RechargeSender {

    @Resource(name="firstRabbitTemplate")
    private RabbitTemplate firstRabbitTemplate;
//    @Autowired
//    private AmqpTemplate amqpTemplate;

    /**
     * @param messageModel
     * 消息体
     */
    public void sendMessage(MessageModel messageModel){
        firstRabbitTemplate.convertAndSend(Constant.USER_CHECK_EXCHANG, Constant.USER_CHECK_ROUTINGKEY,messageModel);
    }

    /**
     * 实体类
     * @param object  JSONUtil.toJsonStr(object)可以不适用,使用jackson2转json,但是多虚拟机下不生效(不知为啥)
     */
    public void sendJsonObject(Object object){
        firstRabbitTemplate.convertAndSend(Constant.USER_CHECK_EXCHANG, Constant.USER_CHECK_ROUTINGKEY,JSONUtil.toJsonStr(object));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
1.4 消费者接收消息
@Slf4j
@Component
public class UserSyncReceiver {

    @Value("${proxy.targetAddr}")
    private String targetAddr;
    @Value("${junZhu.companyId}")
    private String jzCompanyId;
    @Autowired
    private LkMemberBindingService lkMemberBindingService;
		/*
		 * primary 无需特殊指定连接工厂
		 */
    @RabbitHandler
    @RabbitListener(queues = Constant.COMMUNITY_USER_SYNC_QUEUE)
    public void process(String content) {
        log.info("UserSyncReceiver接收到消息:{}", content);
        JSONObject jsonObject = JSON.parseObject(content);
    }
  	/*
		 *  没有primary注解,必须指定连接工厂,containerFactory = "firstListenerFactory"
		 */
    @RabbitHandler
    @RabbitListener(queues = Constant.USER_CHECK_QUEUE,containerFactory = "firstListenerFactory")
    public void process2(String content) {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

参考:
[1]: https://zhuanlan.zhihu.com/p/558247588 “rabbit MQ 实战——Rabbit源码解析及多个Virtual-host如何指定 Exchange, Queue, Binding 所在 Virtual-host(全网资料很少)”

2.RabbitMq 用户管理(命令)

问题:解决RabbitMq登录时报出Not management user(Login failed)错误以及rabbitmqctl(RabbitMq)新增用户和用户角色权限

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YhB2BYG1-1691146210821)(/Users/lds/Library/Application Support/typora-user-images/image-20230612165040368.png)]

解决:rabbitmqctl set_user_tags guest administrator
  • 1

查看权限命令

rabbitmqctl list_users
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/139056
推荐阅读
相关标签
  

闽ICP备14008679号