当前位置:   article > 正文

Springboot项目整合Rabbitmq详细教程_springboot rabbitmq

springboot rabbitmq

RabbitMQ介绍

RabbitMQ是一个由Erlang语言编写的开源消息中间件,以AMQP(Advanced Message Queuing Protocol)作为消息传输协议。它可以在分布式系统中实现可靠的消息交换,并提供额外的功能来增强消息的持久化、路由与可靠性。

RabbitMQ的主要特点包括以下几个方面:

  1. 可靠性:RabbitMQ使用可靠的消息传递机制,确保消息在生产者和消费者之间的稳定传输。当从生产者发送消息到RabbitMQ时,RabbitMQ会将其存储在硬盘上,以防止意外的消息丢失。同时,在消息传输过程中,RabbitMQ会跟踪处理消息的状态,并将其发送到正确的消费者,以确保消息的可靠性和顺序传输。
  2. 灵活性:RabbitMQ支持多种消息传输模式,如点对点模式、发布/订阅模式和路由模式等。这使得开发人员可以根据实际需求选择合适的消息架构,从而实现灵活的系统设计和扩展。
  3. 扩展性:RabbitMQ具有良好的扩展性,允许用户在需要的时候动态添加或移除队列,以及增加或减少消息处理能力。这使得系统可以根据负载的变化进行自动平衡,并保持高可用性和性能。
  4. 延迟支持:RabbitMQ提供了延迟队列的支持,允许用户以一定的延迟将消息发送到指定的消费者。这对于需要处理实时数据、有时间敏感性要求的应用来说非常有用。
  5. 高可用性:RabbitMQ支持集群和镜像队列的特性,可以复制队列和消息到多个节点上,从而提高系统的可靠性和可用性。当某个节点出现故障时,其他节点仍然可以接收和处理消息,保证服务的连续性。
  6. 多语言支持:RabbitMQ提供了丰富的客户端库,支持多种编程语言,如Java、Python、Ruby、C#等,使得开发人员可以使用自己熟悉的编程语言进行操作和交互。

总之,RabbitMQ是一个功能强大、可靠性高并且易于使用的消息中间件。它可以帮助开发人员构建分布式系统、处理异步消息和实现解耦,广泛应用于各种场景中,如微服务架构、数据同步、日志处理、任务队列等。无论是企业级应用还是个人项目,都可以通过RabbitMQ来实现高效、可靠的消息传输。

配置教程

首先在springboot的pom文件里引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

然后application.yml:

ps:里面的虚拟host配置项不是必须的,我自己在rabbitmq服务上创建了自己的虚拟host,所以我配置了;你们不创建,就不用加这个配置项。

  1. spring:
  2. application:
  3. name: 3d-gis
  4. datasource:
  5. driver-class-name: org.postgresql.Driver
  6. url: jdbc:postgresql://172.16.10.201:5432/3d_gis
  7. username: postgres
  8. password: postgres
  9. rabbitmq:
  10. publisher-confirms: true
  11. publisher-returns: true
  12. host: 172.16.10.201
  13. port: 5672
  14. username: leaniot
  15. password: leaniot
  16. virtual-host: /3d_gis
  17. listener:
  18. simple:
  19. acknowledge-mode: manual
  20. direct:
  21. acknowledge-mode: manual

新建RabbitMQ配置类

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * RabbitMQ 配置
  6. *
  7. * @author tarzan
  8. * @version 1.0
  9. * @date 2021年05月18日 09:15:40
  10. * @since JDK1.8
  11. */
  12. @Configuration
  13. public class RabbitMQConfig{
  14. /** gis图形交换机 */
  15. public static final String GIS_GRAPHICS_EXCHANGE = "3d_gis_exchange";
  16. /** gis图形数据接收序列 */
  17. public static final String GIS_DATA_RECEIVE_REQUEUE = "gis_data_queue";
  18. /** gis图形数据返回(发送)序列 */
  19. public static final String GIS_DATA_SEND_QUEUE = "gis_result_queue";
  20. @Bean
  21. public Queue gisDataReceiveQueue () {
  22. return new Queue(GIS_DATA_RECEIVE_REQUEUE);
  23. }
  24. @Bean
  25. public Queue gisDataSendQueue () {
  26. return new Queue(GIS_DATA_SEND_QUEUE);
  27. }
  28. @Bean
  29. public DirectExchange directExchange() {
  30. return new DirectExchange(GIS_GRAPHICS_EXCHANGE);
  31. }
  32. @Bean
  33. public Binding receiveBinding () {
  34. return BindingBuilder.bind(gisDataReceiveQueue()).to(directExchange()).withQueueName();
  35. }
  36. @Bean
  37. public Binding sendBinding () {
  38. return BindingBuilder.bind(gisDataSendQueue()).to(directExchange()).withQueueName();
  39. }
  40. }
新建RabbitSender发送消息类
  1. import java.util.UUID;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springblade.gis.rabbitmq.confirm.RabbitReturnCallback;
  4. import org.springblade.gis.rabbitmq.confirm.RabbitConfirmCallback;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.InitializingBean;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.Resource;
  10. @Component
  11. @Slf4j
  12. public class RabbitSender implements InitializingBean {
  13. @Resource
  14. private RabbitTemplate rabbitTemplate;
  15. /**
  16. * 对外发送消息的方法
  17. * @param message 具体的消息内容
  18. * @throws Exception
  19. */
  20. public void send(Object message) {
  21. CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
  22. rabbitTemplate.convertAndSend(RabbitMQConfig.GIS_GRAPHICS_EXCHANGE, RabbitMQConfig.GIS_DATA_SEND_QUEUE, message, data);
  23. }
  24. @Override
  25. public void afterPropertiesSet () {
  26. rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
  27. rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
  28. rabbitTemplate.setMandatory(true);
  29. }
  30. }
新建RabbitReceiver接收消息类
  1. import com.alibaba.fastjson.JSONArray;
  2. import com.alibaba.fastjson.JSONObject;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springblade.gis.modules.db.service.DbDataService;
  5. import org.springframework.amqp.rabbit.annotation.*;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.stereotype.Component;
  8. import com.rabbitmq.client.Channel;
  9. import javax.annotation.Resource;
  10. import java.io.IOException;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Map;
  14. @Component
  15. @Slf4j
  16. public class RabbitReceiver {
  17. @Resource
  18. private DbDataService dbDataService;
  19. @Resource
  20. private RabbitSender sender;
  21. /**
  22. * 组合使用监听
  23. * @param message
  24. * @param channel
  25. */
  26. @RabbitListener(queues =RabbitMQConfig.GIS_DATA_RECEIVE_REQUEUE)
  27. public void onMessage(String msg,Message message, Channel channel) {
  28. // 1. 收到消息以后进行业务端消费处理
  29. String body=new String(message.getBody());
  30. log.info("消费消息:" + body );
  31. JSONObject json;
  32. try {
  33. json=JSONObject.parseObject(body);
  34. } catch (Exception e) {
  35. ackOrReject(message,channel,true);
  36. return;
  37. }
  38. String table= json.getString("table");
  39. JSONArray data= json.getJSONArray("data");
  40. List<Map<String,Object>> maps=new ArrayList<>();
  41. data.forEach(e->{
  42. Map<String,Object> map=(Map<String, Object>) e;
  43. maps.add(map);
  44. });
  45. List<Long> ids=dbDataService.saveBatch(table,maps);
  46. sender.send(JSONArray.toJSONString(ids));
  47. // 2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收
  48. ackOrReject(message,channel,true);
  49. }
  50. private void ackOrReject(Message message, Channel channel, boolean result) {
  51. try {
  52. if (result) {
  53. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  54. } else {
  55. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  56. }
  57. }catch (IOException e){
  58. new IOException();
  59. }
  60. }
  61. }

新建RabbitMQ测试类

  1. import io.swagger.annotations.Api;
  2. import io.swagger.annotations.ApiOperation;
  3. import lombok.AllArgsConstructor;
  4. import org.springblade.core.secure.annotation.NoToken;
  5. import org.springblade.core.tool.api.R;
  6. import org.springblade.three.dimension.gis.rabbitmq.RabbitSender;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import javax.annotation.Resource;
  11. @NoToken
  12. @RestController
  13. @AllArgsConstructor
  14. @RequestMapping("rabbit")
  15. @Api(value = "消息队列", tags = "消息队列")
  16. public class RabbitController {
  17. @Resource
  18. private RabbitSender sender;
  19. @ApiOperation(value = "添加 @author Tarzan Liu")
  20. @GetMapping("test")
  21. public R test(){
  22. sender.send("测试666");
  23. return R.status(true);
  24. }
  25. }

测试结果

 

RabbitMQ应用场景

RabbitMQ是一个流行的开源消息代理(message broker)软件,它实现了高性能、可靠的消息传递机制。以下是一些常见的RabbitMQ应用场景:

  1. 异步任务处理:RabbitMQ可以用作异步任务队列,将需要处理的任务发送到消息队列中,然后由消费者进行消费和处理。这样可以有效地解耦任务的产生和执行,提高系统的并发性和可伸缩性。

  2. 分布式系统集成:当构建分布式系统时,不同的模块和组件通常需要进行消息交换和协调。RabbitMQ可以作为这些系统之间的消息中间件,实现可靠的消息传递和数据同步,简化系统之间的集成和通信。

  3. 日志收集和分发:在分布式系统和微服务架构中,日志管理是一个重要的挑战。RabbitMQ可以用来收集和分发日志消息,将日志从不同的节点发送到中央的日志存储或分析系统,方便进行故障排查和性能监控。

  4. 事件驱动架构:RabbitMQ可以用于实现事件驱动架构,通过发布-订阅模式(Publish-Subscribe)来处理系统内部和外部事件的传递。这样可以实现松耦合的系统设计,使得不同的模块可以对事件进行订阅和响应,实现高度灵活和可扩展的系统架构。

  5. 消息通知和推送:RabbitMQ可以用于实现消息通知和推送功能。当系统中的某些事件发生时,通过将消息发布到RabbitMQ中,可以及时地将通知推送给相关的用户或订阅者。这在实时通知、即时聊天等场景下非常有用。

  6. 应用解耦:通过使用RabbitMQ作为消息中间件,不同的应用程序或服务可以通过消息队列进行解耦。这样,应用程序之间的依赖关系减少,各个模块可以独立演进和扩展,提高系统的灵活性和可维护性。

需要注意的是,具体的应用场景取决于业务需求和系统设计。RabbitMQ提供了丰富的特性和灵活的配置选项,可以根据具体情况来选择合适的使用方式和部署架构。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号