赞
踩
1.线程的生产者消费者模型synchronized,wait(),notifyAll();
2.RabbitMQ是非常热门的一款消息中间件;
3.RabbitMQ的Docker版本的安装以及配置;
4.RabbitMQ基本概念,生产者,消息队列,消费者;
5.基于多模块划分的方式,构建rabbitmq的简单队列;
6.传输对象,转换成json,采用配置类事项;
7.work queue一对多的,多个消费者;
7.采用RabbitMQ的工作队列解决大量注册问题;
https://gitee.com/pet365/spring-rabbitmq
多个消费者模块
package com.tianju.auth.product; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; /** * 先到先得,队列 Queue */ public class KFC { private Queue<Hamburger> queue = new LinkedBlockingQueue<>(); private final int SIZE = 50; // 最多放50个汉堡 /** * 生产 */ public synchronized void produce() { if (queue.size()==SIZE){ // 队列已满,就不再生产,处于等待状态 try { System.out.println("*****队列已满,暂停生产"); wait(); // CPU阻塞这个线程 } catch (InterruptedException e) { e.printStackTrace(); } }else { Hamburger hamburger = new Hamburger("鸡肉汉堡", 13.39); queue.add(hamburger); System.out.println("[生产者] 生产1个汉堡,放入队列,目前的汉堡数量为:"+queue.size()); notifyAll(); // 唤醒所有wait状态,可以进行消费 } } /** * 消费 */ public synchronized void consume(){ if (queue.isEmpty()){ // 队列已空,不能消费 try { System.out.println("*****队列已空,暂停消费"); wait(); // CPU阻塞这个线程 } catch (InterruptedException e) { e.printStackTrace(); } }else { Hamburger h = queue.poll(); System.out.println("[消费者] 消费1个汉堡"+h.getName()+",目前的汉堡数量为:"+queue.size()); notifyAll(); // 唤醒所有wait状态,可以进行消费 } } }
Hamburger.java实体类
package com.tianju.auth.product;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Hamburger {
private String name;
private double price;
}
生产者:ProduceTask.java文件
package com.tianju.auth.product; /** * 生产者 */ public class ProduceTask implements Runnable{ private KFC kfc; public ProduceTask(KFC kfc) { this.kfc = kfc; } @Override public void run() { while (true){ kfc.produce(); try { Thread.sleep(250); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者Consume.java文件
package com.tianju.auth.product; /** * 消费者 */ public class ConsumeTask implements Runnable{ private KFC kfc; public ConsumeTask(KFC kfc) { this.kfc = kfc; } @Override public void run() { while (true){ kfc.consume(); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } }
package com.tianju.auth.product; public class TestDemo { public static void main(String[] args) { KFC kfc = new KFC(); // 生产者4个 Thread p1 = new Thread(new ProduceTask(kfc)); Thread p2 = new Thread(new ProduceTask(kfc)); Thread p3 = new Thread(new ProduceTask(kfc)); Thread p4 = new Thread(new ProduceTask(kfc)); // 消费者 6 个 Thread c1 = new Thread(new ConsumeTask(kfc)); Thread c2 = new Thread(new ConsumeTask(kfc)); Thread c3 = new Thread(new ConsumeTask(kfc)); Thread c4 = new Thread(new ConsumeTask(kfc)); Thread c5 = new Thread(new ConsumeTask(kfc)); Thread c6 = new Thread(new ConsumeTask(kfc)); p1.start(); p2.start(); p3.start(); p4.start(); c1.start(); c2.start(); c3.start(); c4.start(); c5.start(); c6.start(); } }
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.
RabbitMQ是非常热门的一款消息中间件。如英文中解释道: RabbitMQ是一个信息协调者,类似于我们的邮局。
redis+redission+lua+mq
- 解耦
- 消峰
docker search rabbitmq
[root@localhost ~]# docker search rabbitmq
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rabbitmq RabbitMQ is an open source multi-protocol me… 4798 [OK]
bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 100 [OK]
bitnami/rabbitmq-exporter 2
circleci/rabbitmq This image is for internal use 0
circleci/rabbitmq-delayed https://github.com/circleci/rabbitmq-delayed… 1
docker pull rabbitmq
[root@localhost ~]# docker pull rabbitmq
Using default tag: latest
latest: Pulling from library/rabbitmq
7b1a6ab2e44d: Pull complete
37f453d83d8f: Pull complete
e64e769bc4fd: Pull complete
c288a913222f: Pull complete
12addf9c8bf9: Pull complete
eaeb088e057d: Pull complete
b63d48599313: Pull complete
05c99d3d2a57: Pull complete
43665bfbc3f9: Pull complete
Digest: sha256:884146137011519524d506a12687127f3d2c7c37c2cc11206dc72c59bedea5e2
Status: Downloaded newer image for rabbitmq:latest
docker.io/library/rabbitmq:latest
docker run -it —name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq
docker run -it --name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq
[root@localhost ~]# docker run -it --name=rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 rabbitmq 2023-06-17 02:35:10.091757+00:00 [info] <0.222.0> Feature flags: list of feature flags found: 2023-06-17 02:35:10.107600+00:00 [info] <0.222.0> Feature flags: [ ] implicit_default_bindings 2023-06-17 02:35:10.107642+00:00 [info] <0.222.0> Feature flags: [ ] maintenance_mode_status 2023-06-17 02:35:10.107661+00:00 [info] <0.222.0> Feature flags: [ ] quorum_queue 2023-06-17 02:35:10.107675+00:00 [info] <0.222.0> Feature flags: [ ] stream_queue 2023-06-17 02:35:10.107749+00:00 [info] <0.222.0> Feature flags: [ ] user_limits 2023-06-17 02:35:10.107764+00:00 [info] <0.222.0> Feature flags: [ ] virtual_host_metadata 2023-06-17 02:35:10.107778+00:00 [info] <0.222.0> Feature flags: feature flag states written to disk: yes : : ## ## RabbitMQ 3.9.11 ## ## ########## Copyright (c) 2007-2021 VMware, Inc. or its affiliates. ###### ## ########## Licensed under the MPL 2.0. Website: https://rabbitmq.com Erlang: 24.2 [jit] TLS Library: OpenSSL - OpenSSL 1.1.1m 14 Dec 2021 Doc guides: https://rabbitmq.com/documentation.html Support: https://rabbitmq.com/contact.html Tutorials: https://rabbitmq.com/getstarted.html Monitoring: https://rabbitmq.com/monitoring.html
[root@192 ~]# firewall-cmd --zone=public --add-port=15672/tcp --permanent
success
[root@192 ~]# firewall-cmd --zone=public --add-port=5672/tcp --permanent
success
[root@192 ~]# firewall-cmd --reload
success
[root@192 ~]# firewall-cmd --zone=public --list-ports
3306/tcp 15672/tcp 5672/tcp
[root@192 ~]#
[root@localhost ~]# docker exec -it rabbitmq bash
rabbitmq_management
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@6d2342d51b11:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@6d2342d51b11...
The following plugins have been enabled:
rabbitmq_management
started 1 plugins.
management UI被禁止了。也需要启动一下
root@14d0aff212b2:/# cd /etc/rabbitmq/conf.d/
root@14d0aff212b2:/etc/rabbitmq/conf.d# ls
10-default-guest-user.conf management_agent.disable_metrics_collector.conf
root@14d0aff212b2:/etc/rabbitmq/conf.d# cat management_agent.disable_metrics_collector.conf
management_agent.disable_metrics_collector = true
root@14d0aff212b2:/etc/rabbitmq/conf.d# echo management_agent.disable_metrics_collector=false > management_agent.disable_metrics_collector.conf
root@14d0aff212b2:/etc/rabbitmq/conf.d# cat management_agent.disable_metrics_collector.conf management_agent.disable_metrics_collector=false
进入到容器内的/etc/rabbitmq/conf.d文件夹
修改management_agent.disable_metrics_collector.conf文件中的management_agent.disable_metrics_collector值为false。
echo management_agent.disable_metrics_collector=false > management_agent.disable_metrics_collector.conf
root@6d2342d51b11:/etc/rabbitmq/conf.d# exit
exit
[root@localhost ~]# docker restart rabbitmq
rabbitmq
http://192.168.111.130:15672/#/
在浏览器中输入 linux的ip地址:15672
默认用户名、密码是在启动容器时创建的:
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123
更新时间
management UI被禁止了。
1.进入rabbitmq容器
进入到容器内的/etc/rabbitmq/conf.d文件夹
2.修改management_agent.disable_metrics_collector
修改management_agent.disable_metrics_collector.conf文件中的management_agent.disable_metrics_collector值为false。
echo management_agent.disable_metrics_collector=false > management_agent.disable_metrics_collector.conf
3.退出并重启容器
root@6d2342d51b11:/etc/rabbitmq/conf.d# exitexit[root@localhost ~]# docker restart rabbitmq rabbitmq
4.刷新浏览器测试
能够访问
rabbitmq+项目整合
消息传输机制一种实现。
原理: 生产者-消息队列- 消费者模型
RabbitMQ服务。
生产者或是服务者都需要与Broker建立的TCP连接。
TCP/UDP
保持的TCP长连接里面去创建和释放Channel,从而减少资源的消耗。其中Channel是相互隔离的,不能共享。
Queue是生产者与消费者的中间交互队列,生产者发送的消息到达队列,在队列中存储,消费者从队列中消费消息。
相关的参数
从队列(Queue)上获取消息。
- Pull模式
- Push模式
根据具体的绑定规则分发到具体的队列。
多模块: rabbitmq-common,rabbitmq-producer, rabbitmq-consuer
common模块
生产者producer
消费者consumer
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
SimpleQueueConfig,java文件
package com.tianju.mq.common.config; import com.tianju.mq.common.constants.RabbitMqConstants; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SimpleQueueConfig { @Bean public Queue simpleQueue(){ // 队列名称,持久化 return new Queue(RabbitMqConstants.MQ_SIMPLE_QUEUE, RabbitMqConstants.isDurable); } }
RabbitMqConstants.java接口,提供参数
package com.tianju.mq.common.constants;
public interface RabbitMqConstants {
String MQ_SIMPLE_QUEUE="mq_simple_queue"; //简单队列名称 点对点消费
boolean isDurable = true;
}
package com.tianju.mq.common.result; import com.fasterxml.jackson.annotation.JsonFormat; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; /** * 返回给前端的响应 * @param <T> */ @Data @NoArgsConstructor @AllArgsConstructor public class HttpResp<T> implements Serializable { private ResultCode resultCode; @JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss",timezone = "GMT+8") private Date time; private T result; public static <T> HttpResp <T> results( ResultCode resultCode, Date time, T results){ HttpResp httpResp = new HttpResp(); httpResp.setResultCode(resultCode); httpResp.setTime(time); httpResp.setResult(results); return httpResp; } }
package com.tianju.mq.common.result; import com.fasterxml.jackson.annotation.JsonFormat; import lombok.Getter; /** * 枚举类型,http请求的返回值 */ // 枚举类型的json化,需要有get方法 @JsonFormat(shape = JsonFormat.Shape.OBJECT) @Getter public enum ResultCode { BOOK_RUSH_SUCCESS(20010,"图书抢购成功"), BOOK_RUSH_ERROR(3001,"图书抢购失败"), LUA_SCRIPT_ERROR(3002,"Lua脚本操作失败"), USER_FIND_ERROR(40010,"非法请求,布隆过滤器不通过"), USER_FIND_SUCCESS(20010,"查询用户名成功"), USER_LOGIN_ERROR(40030,"用户登陆失败"), USER_LOGIN_SUCCESS(20020,"用户登陆成功"), RABBITMQ_SIMPLE_QUEUE_SUCCESS(60006,"简单队列成功"), ; private Integer code; private String msg; private ResultCode(Integer code,String msg){ this.code =code; this.msg = msg; } }
package com.tianju.mq.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = "com.tianju.mq")
public class ProducerApp {
public static void main(String[] args) {
SpringApplication.run(ProducerApp.class, args);
}
}
package com.tianju.mq.producer.service.impl; import com.tianju.mq.common.constants.RabbitMqConstants; import com.tianju.mq.producer.service.IProducerService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class ProducerServiceImpl implements IProducerService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void simpleQueueSend(String msg) { // 往什么队列发送,msg信息 rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_SIMPLE_QUEUE,msg); log.debug("[生产者模块:]向{}发送信息----> {}",RabbitMqConstants.MQ_SIMPLE_QUEUE,msg); } }
package com.tianju.mq.producer.controller; import com.tianju.mq.common.result.HttpResp; import com.tianju.mq.common.result.ResultCode; import com.tianju.mq.producer.service.IProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController @RequestMapping("/api/produce") public class ProducerController { @Autowired private IProducerService producerService; @RequestMapping("/simpleSend") public HttpResp<String> simpleSend(){ String msg = "produce a msg"; producerService.simpleQueueSend(msg); return HttpResp.results(ResultCode.RABBITMQ_SIMPLE_QUEUE_SUCCESS, new Date(),msg); } }
打印日志,发送成功
进入管理端查看
@RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)
package com.tianju.mq.consumer.service.impl; import com.tianju.mq.common.constants.RabbitMqConstants; import com.tianju.mq.consumer.service.IConsumerService; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.util.Date; @Service @Slf4j public class ConsumerServiceImpl implements IConsumerService { @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE) @Override public void simpleQueueConsume(String msg) { log.debug("[消费者模块:] 在{}消费了一条信息---->{}",new Date(),msg); } }
ConsumerApp.java启动类
package com.tianju.mq.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = "com.tianju.mq")
public class ConsumerApp {
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class);
}
}
管理端查看
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.tianju.mq.producer.entity.User
package com.tianju.mq.producer.entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; @Data @NoArgsConstructor @AllArgsConstructor public class User implements Serializable { private Integer id; private String name; private String createBy; }
管理端查看
@RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE)
@Override
public void simpleQueueUserConsume(User user) {
log.debug("[消费者模块:] 在{}消费了一个对象信息---->{}",new Date(),user);
}
对象序列化后,占用大量的空间,建议使用字符串传输。
SimpleQueueConfig.java进行配置
package com.tianju.mq.common.config; import com.tianju.mq.common.constants.RabbitMqConstants; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SimpleQueueConfig { @Bean public Queue simpleQueue(){ // 队列名称,持久化 return new Queue(RabbitMqConstants.MQ_SIMPLE_QUEUE, RabbitMqConstants.isDurable); } /** * 将对象转换为json字符串 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器 return rabbitTemplate; } }
1个生产者多个消费者
package com.tianju.mq.common.constants;
public interface RabbitMqConstants {
String MQ_SIMPLE_QUEUE="mq_simple_queue"; //简单队列名称 点对点消费
String MQ_WORK_QUEUE="mq_work_queue"; // 工作队列,1对多,一个生产者,多个消费者
boolean isDurable = true;
}
package com.tianju.mq.common.config; import com.tianju.mq.common.constants.RabbitMqConstants; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class WorkQueueConfig { @Bean public Queue workQueue(){ // 队列名称,持久化 return new Queue(RabbitMqConstants.MQ_WORK_QUEUE, RabbitMqConstants.isDurable); } /** * 将对象转换为json字符串 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); return rabbitTemplate; } }
@Override
public void workQueueUserSend(User user) {
rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_WORK_QUEUE,user);
log.debug("[生产者模块:]向{}发送对象user----> {}",RabbitMqConstants.MQ_WORK_QUEUE,user);
}
package com.tianju.mq.consumer.service.impl; import com.rabbitmq.client.Channel; import com.tianju.mq.common.constants.RabbitMqConstants; import com.tianju.mq.common.entity.User; import com.tianju.mq.consumer.service.IConsumerService; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.stereotype.Service; import java.util.Date; @Service @Slf4j public class ConsumerServiceImpl implements IConsumerService { // @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE) @Override public void simpleQueueConsume(String msg) { log.debug("[消费者模块:] 在{}消费了一条信息---->{}",new Date(),msg); } // @RabbitListener(queues = RabbitMqConstants.MQ_SIMPLE_QUEUE) @Override public void simpleQueueUserConsume(User user) { log.debug("[消费者模块:] 在{}消费了一个对象信息---->{}",new Date(),user); } @RabbitListener(queues = RabbitMqConstants.MQ_WORK_QUEUE) @Override public void workQueueUserConsumeA(User user, Channel channel,Message message) { log.debug("[A消费者模块:] 在{}消费了一个对象信息---->{},信息message为{}", new Date(),user,message.getMessageProperties()); } @RabbitListener(queues = RabbitMqConstants.MQ_WORK_QUEUE) @Override public void workQueueUserConsumeB(User user, Channel channel,Message message) { log.debug("[B消费者模块:] 在{}消费了一个对象信息---->{},信息message为{}", new Date(),user,message.getMessageProperties()); } }
https://gitee.com/pet365/spring-rabbitmq
@PostMapping("/register")
public HttpResp<User> register(User user){
producerService.register(user);
return HttpResp.results(ResultCode.USER_REGISTER_SUCCESS,new Date(),user);
}
@RabbitListener(queues = RabbitMqConstants.MQ_REGISTER_WORK_QUEUE) @Override public void registerUserConsumerA(User user, Channel channel, Message message) { log.debug("[A消费者注册模块:] 在{}消费了一个对象信息---->{},信息message为{}", new Date(),user,message.getMessageProperties()); consumerDao.insert(user); log.info("用户{}注册成功,存入数据库",user); // 短信提醒,邮箱提醒 } @RabbitListener(queues = RabbitMqConstants.MQ_REGISTER_WORK_QUEUE) @Override public void registerUserConsumerB(User user, Channel channel, Message message) { log.debug("[B消费者注册模块:] 在{}消费了一个对象信息---->{},信息message为{}", new Date(),user,message.getMessageProperties()); consumerDao.insert(user); log.info("用户{}注册成功,存入数据库",user); // 短信提醒,邮箱提醒 }
1.线程的生产者消费者模型synchronized,wait(),notifyAll();
2.RabbitMQ是非常热门的一款消息中间件;
3.RabbitMQ的Docker版本的安装以及配置;
4.RabbitMQ基本概念,生产者,消息队列,消费者;
5.基于多模块划分的方式,构建rabbitmq的简单队列;
6.传输对象,转换成json,采用配置类事项;
7.work queue一对多的,多个消费者;
7.采用RabbitMQ的工作队列解决大量注册问题;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。