当前位置:   article > 正文

RabbitMQ基础(1)——生产者消费者模型 & RabbitMQ简介 & Docker版本的安装配置 & RabbitMQ的helloworld + 分模块构建 & 解决大量注册案例_rabbitmq 消费者

rabbitmq 消费者

引出


1.线程的生产者消费者模型synchronized,wait(),notifyAll();
2.RabbitMQ是非常热门的一款消息中间件
3.RabbitMQ的Docker版本的安装以及配置;
4.RabbitMQ基本概念,生产者,消息队列,消费者;
5.基于多模块划分的方式,构建rabbitmq的简单队列;
6.传输对象,转换成json,采用配置类事项;
7.work queue一对多的,多个消费者;
7.采用RabbitMQ的工作队列解决大量注册问题;

https://gitee.com/pet365/spring-rabbitmq

线程的生产者消费者模型

在这里插入图片描述

多个消费者模块

在这里插入图片描述

KFC.java 生产消费汉堡

package com.tianju.auth.product;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 先到先得,队列 Queue
 */
public class KFC {
    private Queue<Hamburger> queue = new LinkedBlockingQueue<>();
    private final int SIZE = 50; // 最多放50个汉堡

    /**
     * 生产
     */
    public synchronized void produce() {
        if (queue.size()==SIZE){
            // 队列已满,就不再生产,处于等待状态
            try {
                System.out.println("*****队列已满,暂停生产");
                wait(); // CPU阻塞这个线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else {
            Hamburger hamburger = new Hamburger("鸡肉汉堡", 13.39);
            queue.add(hamburger);
            System.out.println("[生产者] 生产1个汉堡,放入队列,目前的汉堡数量为:"+queue.size());
            notifyAll(); // 唤醒所有wait状态,可以进行消费
        }

    }

    /**
     * 消费
     */
    public synchronized void consume(){
        if (queue.isEmpty()){
            // 队列已空,不能消费
            try {
                System.out.println("*****队列已空,暂停消费");
                wait(); // CPU阻塞这个线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else {
            Hamburger h = queue.poll();
            System.out.println("[消费者] 消费1个汉堡"+h.getName()+",目前的汉堡数量为:"+queue.size());
            notifyAll(); // 唤醒所有wait状态,可以进行消费
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

Hamburger.java实体类

package com.tianju.auth.product;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Hamburger {
    private String name;
    private double price;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

生产者Product消费者Consume

生产者:ProduceTask.java文件

package com.tianju.auth.product;

/**
 * 生产者
 */
public class ProduceTask implements Runnable{
    private KFC kfc;

    public ProduceTask(KFC kfc) {
        this.kfc = kfc;
    }

    @Override
    public void run() {
        while (true){
            kfc.produce();
            try {
                Thread.sleep(250);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

消费者Consume.java文件

package com.tianju.auth.product;

/**
 * 消费者
 */
public class ConsumeTask implements Runnable{
    private KFC kfc;

    public ConsumeTask(KFC kfc) {
        this.kfc = kfc;
    }

    @Override
    public void run() {
        while (true){
            kfc.consume();
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

运行

package com.tianju.auth.product;

public class TestDemo {
    public static void main(String[] args) {
        KFC kfc = new KFC();
        // 生产者4个
        Thread p1 = new Thread(new ProduceTask(kfc));
        Thread p2 = new Thread(new ProduceTask(kfc));
        Thread p3 = new Thread(new ProduceTask(kfc));
        Thread p4 = new Thread(new ProduceTask(kfc));

        // 消费者 6 个
        Thread c1 = new Thread(new ConsumeTask(kfc));
        Thread c2 = new Thread(new ConsumeTask(kfc));
        Thread c3 = new Thread(new ConsumeTask(kfc));
        Thread c4 = new Thread(new ConsumeTask(kfc));
        Thread c5 = new Thread(new ConsumeTask(kfc));
        Thread c6 = new Thread(new ConsumeTask(kfc));

        p1.start();
        p2.start();
        p3.start();
        p4.start();

        c1.start();
        c2.start();
        c3.start();
        c4.start();
        c5.start();
        c6.start();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

在这里插入图片描述

RabbitMQ简介

什么是RabbitMQ?

https://www.rabbitmq.com/

在这里插入图片描述

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.

RabbitMQ是非常热门的一款消息中间件。如英文中解释道: RabbitMQ是一个信息协调者,类似于我们的邮局。

MQ在项目中的作用

redis+redission+lua+mq

  • 解耦
  • 消峰

RabbitMQ安装(docker)

查询RabbitMQ镜像

docker search rabbitmq

[root@localhost ~]# docker search rabbitmq
NAME                                      DESCRIPTION                                      STARS     OFFICIAL   AUTOMATED
rabbitmq                                  RabbitMQ is an open source multi-protocol me…   4798      [OK]       
bitnami/rabbitmq                          Bitnami Docker Image for RabbitMQ                100                  [OK]
bitnami/rabbitmq-exporter                                                                  2                    
circleci/rabbitmq                         This image is for internal use                   0                    
circleci/rabbitmq-delayed                 https://github.com/circleci/rabbitmq-delayed…   1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

拉取镜像

docker pull rabbitmq

[root@localhost ~]# docker pull rabbitmq
Using default tag: latest
latest: Pulling from library/rabbitmq
7b1a6ab2e44d: Pull complete 
37f453d83d8f: Pull complete 
e64e769bc4fd: Pull complete 
c288a913222f: Pull complete 
12addf9c8bf9: Pull complete 
eaeb088e057d: Pull complete 
b63d48599313: Pull complete 
05c99d3d2a57: Pull complete 
43665bfbc3f9: Pull complete 
Digest: sha256:884146137011519524d506a12687127f3d2c7c37c2cc11206dc72c59bedea5e2
Status: Downloaded newer image for rabbitmq:latest
docker.io/library/rabbitmq:latest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

创建运行rabbitmq容器

docker run -it —name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq

docker run -it --name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq
  • 1
[root@localhost ~]# docker run -it --name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq
2023-06-17 02:35:10.091757+00:00 [info] <0.222.0> Feature flags: list of feature flags found:
2023-06-17 02:35:10.107600+00:00 [info] <0.222.0> Feature flags:   [ ] implicit_default_bindings
2023-06-17 02:35:10.107642+00:00 [info] <0.222.0> Feature flags:   [ ] maintenance_mode_status
2023-06-17 02:35:10.107661+00:00 [info] <0.222.0> Feature flags:   [ ] quorum_queue
2023-06-17 02:35:10.107675+00:00 [info] <0.222.0> Feature flags:   [ ] stream_queue
2023-06-17 02:35:10.107749+00:00 [info] <0.222.0> Feature flags:   [ ] user_limits
2023-06-17 02:35:10.107764+00:00 [info] <0.222.0> Feature flags:   [ ] virtual_host_metadata
2023-06-17 02:35:10.107778+00:00 [info] <0.222.0> Feature flags: feature flag states written to disk: yes
:
:
  ##  ##      RabbitMQ 3.9.11
  ##  ##
  ##########  Copyright (c) 2007-2021 VMware, Inc. or its affiliates.
  ######  ##
  ##########  Licensed under the MPL 2.0. Website: https://rabbitmq.com
  Erlang:      24.2 [jit]
  TLS Library: OpenSSL - OpenSSL 1.1.1m  14 Dec 2021
  Doc guides:  https://rabbitmq.com/documentation.html
  Support:     https://rabbitmq.com/contact.html
  Tutorials:   https://rabbitmq.com/getstarted.html
  Monitoring:  https://rabbitmq.com/monitoring.html
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

开放端口

[root@192 ~]# firewall-cmd --zone=public --add-port=15672/tcp --permanent
success
[root@192 ~]# firewall-cmd --zone=public --add-port=5672/tcp --permanent 
success
[root@192 ~]# firewall-cmd --reload
success
[root@192 ~]# firewall-cmd --zone=public --list-ports
3306/tcp 15672/tcp 5672/tcp
[root@192 ~]# 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

启动管理插件【需要】

启动插件 rabbitmq_management【前端页面】

[root@localhost ~]# docker exec -it rabbitmq bash
  • 1

在这里插入图片描述

进行插件启动

rabbitmq_management

root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@6d2342d51b11:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@6d2342d51b11...
The following plugins have been enabled:
  rabbitmq_management
started 1 plugins.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在这里插入图片描述

management UI被禁止了。也需要启动一下

在这里插入图片描述

root@14d0aff212b2:/# cd /etc/rabbitmq/conf.d/
root@14d0aff212b2:/etc/rabbitmq/conf.d# ls
10-default-guest-user.conf  management_agent.disable_metrics_collector.conf
root@14d0aff212b2:/etc/rabbitmq/conf.d# cat management_agent.disable_metrics_collector.conf 
management_agent.disable_metrics_collector = true
root@14d0aff212b2:/etc/rabbitmq/conf.d# echo management_agent.disable_metrics_collector=false > management_agent.disable_metrics_collector.conf
root@14d0aff212b2:/etc/rabbitmq/conf.d# cat management_agent.disable_metrics_collector.conf management_agent.disable_metrics_collector=false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

修改/etc/rabbitmq/conf.d【channels通道】

进入到容器内的/etc/rabbitmq/conf.d文件夹

修改management_agent.disable_metrics_collector

修改management_agent.disable_metrics_collector.conf文件中的management_agent.disable_metrics_collector值为false。

echo management_agent.disable_metrics_collector=false > management_agent.disable_metrics_collector.conf
  • 1

在这里插入图片描述

退出并重启容器

root@6d2342d51b11:/etc/rabbitmq/conf.d# exit
exit
[root@localhost ~]# docker restart rabbitmq 
rabbitmq
  • 1
  • 2
  • 3
  • 4

浏览器测试

http://192.168.111.130:15672/#/

在浏览器中输入 linux的ip地址:15672

在这里插入图片描述

使用默认用户名密码登录

默认用户名、密码是在启动容器时创建的:

-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123

在这里插入图片描述

在这里插入图片描述

更新时间

在这里插入图片描述

点击Chanels问题

问题描述

management UI被禁止了。

在这里插入图片描述

解决方案

1.进入rabbitmq容器

进入到容器内的/etc/rabbitmq/conf.d文件夹

2.修改management_agent.disable_metrics_collector

修改management_agent.disable_metrics_collector.conf文件中的management_agent.disable_metrics_collector值为false。

echo management_agent.disable_metrics_collector=false > management_agent.disable_metrics_collector.conf
  • 1

在这里插入图片描述

3.退出并重启容器

root@6d2342d51b11:/etc/rabbitmq/conf.d# exitexit[root@localhost ~]# docker restart rabbitmq rabbitmq
  • 1

4.刷新浏览器测试

在这里插入图片描述

能够访问

在这里插入图片描述

rabbitmq+项目整合

RabbitMQ

RabbitMQ基本概念

消息传输机制一种实现。

原理: 生产者-消息队列- 消费者模型

在这里插入图片描述

RabbitMQ工作模型

Broker

RabbitMQ服务。

在这里插入图片描述

Connection

生产者或是服务者都需要与Broker建立的TCP连接。

TCP/UDP

在这里插入图片描述

Channel

保持的TCP长连接里面去创建和释放Channel,从而减少资源的消耗。其中Channel是相互隔离的,不能共享。

在这里插入图片描述

Queue

Queue是生产者与消费者的中间交互队列,生产者发送的消息到达队列,在队列中存储,消费者从队列中消费消息。

在这里插入图片描述

相关的参数

在这里插入图片描述

在这里插入图片描述

Consumer

从队列(Queue)上获取消息。

  • Pull模式
  • Push模式

Exchange

根据具体的绑定规则分发到具体的队列。

在这里插入图片描述

RabbitMQ的helloworld

模块划分

多模块: rabbitmq-common,rabbitmq-producer, rabbitmq-consuer

在这里插入图片描述

common模块

在这里插入图片描述

生产者producer

在这里插入图片描述

在这里插入图片描述

消费者consumer

在这里插入图片描述

在这里插入图片描述

common引入amqp

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4

common模块配置

在这里插入图片描述

producer模块配置

在这里插入图片描述

consumer模块配置

在这里插入图片描述

启动

在这里插入图片描述

rabbitmq-common模块

设计配置类

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

SimpleQueueConfig,java文件

package com.tianju.mq.common.config;

import com.tianju.mq.common.constants.RabbitMqConstants;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SimpleQueueConfig {
    @Bean
    public Queue simpleQueue(){
        // 队列名称,持久化
        return new Queue(RabbitMqConstants.MQ_SIMPLE_QUEUE,
                RabbitMqConstants.isDurable);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

RabbitMqConstants.java接口,提供参数

package com.tianju.mq.common.constants;

public interface RabbitMqConstants {
    String MQ_SIMPLE_QUEUE="mq_simple_queue"; //简单队列名称 点对点消费
    boolean isDurable = true;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

返回值

package com.tianju.mq.common.result;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

/**
 * 返回给前端的响应
 * @param <T>
 */

@Data
@NoArgsConstructor
@AllArgsConstructor
public class HttpResp<T> implements Serializable {
    private ResultCode resultCode;

    @JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss",timezone = "GMT+8")
    private Date time;

    private T result;

    public static <T> HttpResp <T> results(
            ResultCode resultCode,
            Date time,
            T results){

        HttpResp httpResp = new HttpResp();
        httpResp.setResultCode(resultCode);
        httpResp.setTime(time);
        httpResp.setResult(results);
        return httpResp;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
package com.tianju.mq.common.result;

import com.fasterxml.jackson.annotation.JsonFormat;

import lombok.Getter;


/**
 * 枚举类型,http请求的返回值
 */
// 枚举类型的json化,需要有get方法
@JsonFormat(shape = JsonFormat.Shape.OBJECT)
@Getter
public enum ResultCode {
    BOOK_RUSH_SUCCESS(20010,"图书抢购成功"),
    BOOK_RUSH_ERROR(3001,"图书抢购失败"),
    LUA_SCRIPT_ERROR(3002,"Lua脚本操作失败"),
    USER_FIND_ERROR(40010,"非法请求,布隆过滤器不通过"),
    USER_FIND_SUCCESS(20010,"查询用户名成功"),
    USER_LOGIN_ERROR(40030,"用户登陆失败"),
    USER_LOGIN_SUCCESS(20020,"用户登陆成功"),
    RABBITMQ_SIMPLE_QUEUE_SUCCESS(60006,"简单队列成功"),
    ;

    private Integer code;

    private String msg;

    private ResultCode(Integer code,String msg){
        this.code =code;
        this.msg = msg;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

生产者模块

扫描配置类

在这里插入图片描述

package com.tianju.mq.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.tianju.mq")
public class ProducerApp {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApp.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

生产者发送信息

在这里插入图片描述

package com.tianju.mq.producer.service.impl;

import com.tianju.mq.common.constants.RabbitMqConstants;
import com.tianju.mq.producer.service.IProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ProducerServiceImpl implements IProducerService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void simpleQueueSend(String msg) {
        // 往什么队列发送,msg信息
        rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_SIMPLE_QUEUE,msg);
        log.debug("[生产者模块:]向{}发送信息----> {}",RabbitMqConstants.MQ_SIMPLE_QUEUE,msg);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

在这里插入图片描述

package com.tianju.mq.producer.controller;

import com.tianju.mq.common.result.HttpResp;
import com.tianju.mq.common.result.ResultCode;
import com.tianju.mq.producer.service.IProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
@RequestMapping("/api/produce")
public class ProducerController {

    @Autowired
    private IProducerService producerService;

    @RequestMapping("/simpleSend")
    public HttpResp<String> simpleSend(){
        String msg = "produce a msg";
        producerService.simpleQueueSend(msg);
        return HttpResp.results(ResultCode.RABBITMQ_SIMPLE_QUEUE_SUCCESS,
                new Date(),msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

启动进行发送

在这里插入图片描述

打印日志,发送成功

在这里插入图片描述

进入管理端查看

在这里插入图片描述

在这里插入图片描述

消费者模块

@RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)

package com.tianju.mq.consumer.service.impl;

import com.tianju.mq.common.constants.RabbitMqConstants;
import com.tianju.mq.consumer.service.IConsumerService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class ConsumerServiceImpl implements IConsumerService {
    @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)
    @Override
    public void simpleQueueConsume(String msg) {
        log.debug("[消费者模块:] 在{}消费了一条信息---->{}",new Date(),msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

ConsumerApp.java启动类

package com.tianju.mq.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.tianju.mq")
public class ConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里插入图片描述

管理端查看

在这里插入图片描述

如果传输的是一个对象?

报错

SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.tianju.mq.producer.entity.User
  • 1

在这里插入图片描述

方案一:对象进行序列化

在这里插入图片描述

package com.tianju.mq.producer.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private Integer id;
    private String name;
    private String createBy;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

在这里插入图片描述

管理端查看

在这里插入图片描述

进行消费

在这里插入图片描述

    @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)
    @Override
    public void simpleQueueUserConsume(User user) {
        log.debug("[消费者模块:] 在{}消费了一个对象信息---->{}",new Date(),user);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

存在问题

对象序列化后,占用大量的空间,建议使用字符串传输。

在这里插入图片描述

方案二:转换为json字符串

SimpleQueueConfig.java进行配置

在这里插入图片描述

package com.tianju.mq.common.config;


import com.tianju.mq.common.constants.RabbitMqConstants;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SimpleQueueConfig {
    @Bean
    public Queue simpleQueue(){
        // 队列名称,持久化
        return new Queue(RabbitMqConstants.MQ_SIMPLE_QUEUE,
                RabbitMqConstants.isDurable);
    }

    /**
     * 将对象转换为json字符串
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器
        return rabbitTemplate;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在这里插入图片描述

Work queues 一对多

1个生产者多个消费者

在这里插入图片描述

在这里插入图片描述

在常量中增加工作模式

package com.tianju.mq.common.constants;

public interface RabbitMqConstants {
    String MQ_SIMPLE_QUEUE="mq_simple_queue"; //简单队列名称 点对点消费

    String MQ_WORK_QUEUE="mq_work_queue"; // 工作队列,1对多,一个生产者,多个消费者
    boolean isDurable = true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

用工作队列的配置

package com.tianju.mq.common.config;


import com.tianju.mq.common.constants.RabbitMqConstants;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkQueueConfig {
    @Bean
    public Queue workQueue(){
        // 队列名称,持久化
        return new Queue(RabbitMqConstants.MQ_WORK_QUEUE,
                RabbitMqConstants.isDurable);
    }

    /**
     * 将对象转换为json字符串
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

在这里插入图片描述

生产者生产到工作队列

    @Override
    public void workQueueUserSend(User user) {
        rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_WORK_QUEUE,user);
        log.debug("[生产者模块:]向{}发送对象user----> {}",RabbitMqConstants.MQ_WORK_QUEUE,user);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

两个消费者进行消费

package com.tianju.mq.consumer.service.impl;

import com.rabbitmq.client.Channel;
import com.tianju.mq.common.constants.RabbitMqConstants;
import com.tianju.mq.common.entity.User;
import com.tianju.mq.consumer.service.IConsumerService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class ConsumerServiceImpl implements IConsumerService {
//    @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)
    @Override
    public void simpleQueueConsume(String msg) {
        log.debug("[消费者模块:] 在{}消费了一条信息---->{}",new Date(),msg);
    }

//    @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)
    @Override
    public void simpleQueueUserConsume(User user) {
        log.debug("[消费者模块:] 在{}消费了一个对象信息---->{}",new Date(),user);
    }

    @RabbitListener(queues = RabbitMqConstants.MQ_WORK_QUEUE)
    @Override
    public void workQueueUserConsumeA(User user, Channel channel,Message message) {
        log.debug("[A消费者模块:] 在{}消费了一个对象信息---->{},信息message为{}",
                new Date(),user,message.getMessageProperties());
    }

    @RabbitListener(queues = RabbitMqConstants.MQ_WORK_QUEUE)
    @Override
    public void workQueueUserConsumeB(User user, Channel channel,Message message) {
        log.debug("[B消费者模块:] 在{}消费了一个对象信息---->{},信息message为{}",
                new Date(),user,message.getMessageProperties());

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

在这里插入图片描述

RabbitMQ解决大量注册

https://gitee.com/pet365/spring-rabbitmq

生产者模块

    @PostMapping("/register")
    public HttpResp<User> register(User user){
        producerService.register(user);
        return HttpResp.results(ResultCode.USER_REGISTER_SUCCESS,new Date(),user);

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

消费者模块

    @RabbitListener(queues = RabbitMqConstants.MQ_REGISTER_WORK_QUEUE)
    @Override
    public void registerUserConsumerA(User user, Channel channel, Message message) {
        log.debug("[A消费者注册模块:] 在{}消费了一个对象信息---->{},信息message为{}",
                new Date(),user,message.getMessageProperties());
        consumerDao.insert(user);
        log.info("用户{}注册成功,存入数据库",user);
        // 短信提醒,邮箱提醒
    }

    @RabbitListener(queues = RabbitMqConstants.MQ_REGISTER_WORK_QUEUE)
    @Override
    public void registerUserConsumerB(User user, Channel channel, Message message) {
        log.debug("[B消费者注册模块:] 在{}消费了一个对象信息---->{},信息message为{}",
                new Date(),user,message.getMessageProperties());
        consumerDao.insert(user);
        log.info("用户{}注册成功,存入数据库",user);
        // 短信提醒,邮箱提醒
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

JMeter测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


总结

1.线程的生产者消费者模型synchronized,wait(),notifyAll();
2.RabbitMQ是非常热门的一款消息中间件
3.RabbitMQ的Docker版本的安装以及配置;
4.RabbitMQ基本概念,生产者,消息队列,消费者;
5.基于多模块划分的方式,构建rabbitmq的简单队列;
6.传输对象,转换成json,采用配置类事项;
7.work queue一对多的,多个消费者;
7.采用RabbitMQ的工作队列解决大量注册问题;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/386678
推荐阅读
相关标签
  

闽ICP备14008679号