赞
踩
RabbitMQ是一个由Erlang语言编写的开源消息中间件,以AMQP(Advanced Message Queuing Protocol)作为消息传输协议。它可以在分布式系统中实现可靠的消息交换,并提供额外的功能来增强消息的持久化、路由与可靠性。
RabbitMQ的主要特点包括以下几个方面:
总之,RabbitMQ是一个功能强大、可靠性高并且易于使用的消息中间件。它可以帮助开发人员构建分布式系统、处理异步消息和实现解耦,广泛应用于各种场景中,如微服务架构、数据同步、日志处理、任务队列等。无论是企业级应用还是个人项目,都可以通过RabbitMQ来实现高效、可靠的消息传输。
首先在springboot的pom文件里引入依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
然后application.yml:
ps:里面的虚拟host配置项不是必须的,我自己在rabbitmq服务上创建了自己的虚拟host,所以我配置了;你们不创建,就不用加这个配置项。
spring: application: name: 3d-gis datasource: driver-class-name: org.postgresql.Driver url: jdbc:postgresql://172.16.10.201:5432/3d_gis username: postgres password: postgres rabbitmq: publisher-confirms: true publisher-returns: true host: 172.16.10.201 port: 5672 username: leaniot password: leaniot virtual-host: /3d_gis listener: simple: acknowledge-mode: manual direct: acknowledge-mode: manual
新建RabbitMQ配置类
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
-
-
- /**
- * RabbitMQ 配置
- *
- * @author tarzan
- * @version 1.0
- * @date 2021年05月18日 09:15:40
- * @since JDK1.8
- */
- @Configuration
- public class RabbitMQConfig{
-
- /** gis图形交换机 */
- public static final String GIS_GRAPHICS_EXCHANGE = "3d_gis_exchange";
- /** gis图形数据接收序列 */
- public static final String GIS_DATA_RECEIVE_REQUEUE = "gis_data_queue";
- /** gis图形数据返回(发送)序列 */
- public static final String GIS_DATA_SEND_QUEUE = "gis_result_queue";
-
- @Bean
- public Queue gisDataReceiveQueue () {
- return new Queue(GIS_DATA_RECEIVE_REQUEUE);
- }
-
- @Bean
- public Queue gisDataSendQueue () {
- return new Queue(GIS_DATA_SEND_QUEUE);
- }
-
-
- @Bean
- public DirectExchange directExchange() {
- return new DirectExchange(GIS_GRAPHICS_EXCHANGE);
- }
-
- @Bean
- public Binding receiveBinding () {
- return BindingBuilder.bind(gisDataReceiveQueue()).to(directExchange()).withQueueName();
- }
-
- @Bean
- public Binding sendBinding () {
- return BindingBuilder.bind(gisDataSendQueue()).to(directExchange()).withQueueName();
- }
-
-
-
- }
新建RabbitSender发送消息类
-
- import java.util.UUID;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.gis.rabbitmq.confirm.RabbitReturnCallback;
- import org.springblade.gis.rabbitmq.confirm.RabbitConfirmCallback;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- @Component
- @Slf4j
- public class RabbitSender implements InitializingBean {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 对外发送消息的方法
- * @param message 具体的消息内容
- * @throws Exception
- */
- public void send(Object message) {
- CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend(RabbitMQConfig.GIS_GRAPHICS_EXCHANGE, RabbitMQConfig.GIS_DATA_SEND_QUEUE, message, data);
- }
-
- @Override
- public void afterPropertiesSet () {
- rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
- rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
- rabbitTemplate.setMandatory(true);
- }
- }
新建RabbitReceiver接收消息类
-
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.gis.modules.db.service.DbDataService;
- import org.springframework.amqp.rabbit.annotation.*;
- import org.springframework.amqp.core.Message;
- import org.springframework.stereotype.Component;
- import com.rabbitmq.client.Channel;
-
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- @Component
- @Slf4j
- public class RabbitReceiver {
-
- @Resource
- private DbDataService dbDataService;
- @Resource
- private RabbitSender sender;
-
-
-
- /**
- * 组合使用监听
- * @param message
- * @param channel
- */
- @RabbitListener(queues =RabbitMQConfig.GIS_DATA_RECEIVE_REQUEUE)
- public void onMessage(String msg,Message message, Channel channel) {
- // 1. 收到消息以后进行业务端消费处理
- String body=new String(message.getBody());
- log.info("消费消息:" + body );
- JSONObject json;
- try {
- json=JSONObject.parseObject(body);
- } catch (Exception e) {
- ackOrReject(message,channel,true);
- return;
- }
- String table= json.getString("table");
- JSONArray data= json.getJSONArray("data");
- List<Map<String,Object>> maps=new ArrayList<>();
- data.forEach(e->{
- Map<String,Object> map=(Map<String, Object>) e;
- maps.add(map);
- });
- List<Long> ids=dbDataService.saveBatch(table,maps);
- sender.send(JSONArray.toJSONString(ids));
- // 2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收
- ackOrReject(message,channel,true);
- }
-
- private void ackOrReject(Message message, Channel channel, boolean result) {
- try {
- if (result) {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- } else {
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
- }
- }catch (IOException e){
- new IOException();
- }
- }
-
-
- }
新建RabbitMQ测试类
-
-
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import lombok.AllArgsConstructor;
- import org.springblade.core.secure.annotation.NoToken;
- import org.springblade.core.tool.api.R;
- import org.springblade.three.dimension.gis.rabbitmq.RabbitSender;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
-
- @NoToken
- @RestController
- @AllArgsConstructor
- @RequestMapping("rabbit")
- @Api(value = "消息队列", tags = "消息队列")
- public class RabbitController {
- @Resource
- private RabbitSender sender;
-
-
- @ApiOperation(value = "添加 @author Tarzan Liu")
- @GetMapping("test")
- public R test(){
- sender.send("测试666");
- return R.status(true);
- }
- }
测试结果
RabbitMQ是一个流行的开源消息代理(message broker)软件,它实现了高性能、可靠的消息传递机制。以下是一些常见的RabbitMQ应用场景:
异步任务处理:RabbitMQ可以用作异步任务队列,将需要处理的任务发送到消息队列中,然后由消费者进行消费和处理。这样可以有效地解耦任务的产生和执行,提高系统的并发性和可伸缩性。
分布式系统集成:当构建分布式系统时,不同的模块和组件通常需要进行消息交换和协调。RabbitMQ可以作为这些系统之间的消息中间件,实现可靠的消息传递和数据同步,简化系统之间的集成和通信。
日志收集和分发:在分布式系统和微服务架构中,日志管理是一个重要的挑战。RabbitMQ可以用来收集和分发日志消息,将日志从不同的节点发送到中央的日志存储或分析系统,方便进行故障排查和性能监控。
事件驱动架构:RabbitMQ可以用于实现事件驱动架构,通过发布-订阅模式(Publish-Subscribe)来处理系统内部和外部事件的传递。这样可以实现松耦合的系统设计,使得不同的模块可以对事件进行订阅和响应,实现高度灵活和可扩展的系统架构。
消息通知和推送:RabbitMQ可以用于实现消息通知和推送功能。当系统中的某些事件发生时,通过将消息发布到RabbitMQ中,可以及时地将通知推送给相关的用户或订阅者。这在实时通知、即时聊天等场景下非常有用。
应用解耦:通过使用RabbitMQ作为消息中间件,不同的应用程序或服务可以通过消息队列进行解耦。这样,应用程序之间的依赖关系减少,各个模块可以独立演进和扩展,提高系统的灵活性和可维护性。
需要注意的是,具体的应用场景取决于业务需求和系统设计。RabbitMQ提供了丰富的特性和灵活的配置选项,可以根据具体情况来选择合适的使用方式和部署架构。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。