当前位置:   article > 正文

java实现rabbitmq消息队列_javamq消息队列

javamq消息队列

1、rabbitmq常用的五种模型

在这里插入图片描述

基本消息模型

在这里插入图片描述
基本消息模型就是:
一个生产者丶默认交换机丶一个队列丶一个消费者。

work消息模型

在这里插入图片描述
work消息模型就是:
一个生产者丶默认交换机丶一个队列丶多个消费者。

fanout广播模式/发布/订阅模式

在这里插入图片描述
fanout消息模型就是:
多个消费者,每一个消费这都有自己的队列,每个队列都绑定到交换机
生产者发送消息到交换机-交换机发送到哪个队列

Routing路由模式(direct)

在这里插入图片描述
Routing路由模式模型就是:
在某种场景下,我们希望不同的消息被不同的队列消费
这个时候我们就要用到direct类型的exchange
生产者向交换机发送消息—交换机根据路由key发送给队列-队列的消费者接收消息

Topics(主题模型)

在这里插入图片描述
Routing路由模式模型就是:
Topics模式和direct路由模式类似,
区别在于Topic类型的交换机可以匹配通配符
符号(通配符):#表示匹配一个或者多个词
*表示匹配一个词

2、实现代码(topic主题模型)

2.1添加依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
         <!--工具类-->
         <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.17</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.2配置rabbitmq信息

server:
  port: 7001
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: username
    password: password
    virtualHost: /
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.3创建交换机和列队(第一种)方法创建

创建rabbit工具类

package com.example.rabbitmq_topic.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitUtil {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 创建交换机
     *
     * @param changeName 交换机名称
     */
    public void createExchange(String changeName) {
        TopicExchange exchange = new TopicExchange(changeName, true, false);
        amqpAdmin.declareExchange(exchange);
    }

    /**
     * 创建队列
     *
     * @param queueName 队列名称
     */
    public void createQueue(String queueName) {
        Queue queue = new Queue(queueName, true, false, false);
        amqpAdmin.declareQueue(queue);
    }

    /**
     * 交换机绑定
     *
     * @param changeName 交换机名称
     * @param routingKey 路由key
     * @param queueName  队列名称
     */
    public void bindExchange(String changeName, String routingKey, String queueName) {
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, changeName, routingKey, null);
        amqpAdmin.declareBinding(binding);
    }

    /**
     * 发送信息到交换机
     *
     * @param changeName 交换机名称
     * @param routingKey 路由key
     * @param message    消息
     */
    public void sendMessage(String changeName, String routingKey, String message) {
        rabbitTemplate.convertAndSend(changeName, routingKey, message);
    }

    /**
     * 删除交换机
     *
     * @param changeName 交换机名称
     */
    public void deleteExchange(String changeName) {
        amqpAdmin.deleteExchange(changeName);
    }

    /**
     * 删除队列
     *
     * @param queueName 队列名称
     */
    public void deleteQueue(String queueName) {
        amqpAdmin.deleteQueue(queueName);
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

创建controller

package com.example.rabbitmq_topic.controlller;

import com.example.rabbitmq_topic.util.RabbitUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @Autowired
    private RabbitUtil rabbitUtil;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message){
        rabbitUtil.createExchange("test-exchange");
        rabbitUtil.createQueue("test-queue");
        rabbitUtil.createQueue("hello-queue");
        //#.test采用通配符绑定
        rabbitUtil.bindExchange("test-exchange","#.test","test-queue");
        rabbitUtil.bindExchange("test-exchange","#.hello","hello-queue");
        //只允许hello队列接收信息
        rabbitUtil.sendMessage("test-exchange","hello",message);
        return "发送成功";
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.4接收队列信息

package com.example.rabbitmq_topic.rabbitListener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TestRabbitListener {

	//接收test-queue,hello-queue队列信息,如果每个队列都有信息,那么就会执行多次方法
	//如果只想接收hello-queue队列信息 如: @RabbitListener(queues={"test-queue"})
    @RabbitListener(queues={"test-queue","hello-queue"})
    public void onMessage(String message){
        log.info("收到信息:{}",message);
    }

    
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.5创建交换机和列队(第二种)注解创建

package com.example.rabbitmq_topic.rabbitListener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TestRabbitListener {
	
	//QueueBinding: 交换机绑定队列
	//Exchange: 创建交换机
	//key: 路由key,采用通配符设置key
	//queue: 创建队列 
	//如果只创建一个交换机和一个队列可删除一个@QueueBinding对象
    @RabbitListener(
            bindings ={

                    @QueueBinding(exchange = @Exchange(value = "bind-exchange",type = "topic",declare = "true",autoDelete = "false"),
                            key = "#.queue",
                            value = @Queue(value = "bind-queue",autoDelete = "false",declare = "true")
                    ),
                    @QueueBinding(exchange = @Exchange(value = "bind-exchange",type = "topic",declare = "true",autoDelete = "false"),
                            key = "#.queue",
                            value = @Queue(value = "bindTest-queue",autoDelete = "false",declare = "true")
                    )
            }
    )
    public void onBindMessage(String message){
        log.info("收到信息:{}",message);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

发送消息代码

  rabbitUtil.sendMessage("bind-exchange","queue",message);
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/384284
推荐阅读
相关标签
  

闽ICP备14008679号