当前位置:   article > 正文

RabbitMQ入门教程:Java项目搭建与消息队列简单模式实战_java rabbitmq最快搭建

java rabbitmq最快搭建

一、项目搭建

1.idea项目搭建java项目

在这里插入图片描述
创建springboot项目,多模块的方式实现消息队列的发布与消息消费。
按照上图方式创建项目及模块:

mq-demo/
	consumer/
		gradle
		src
		.gitignore
		build.gradle
		gradlew
		gradlew.bat
		HELP.md
		settings.gradle
	publisher/
		gradle
		src
		.gitignore
		build.gradle
		gradlew
		gradlew.bat
		HELP.md
		settings.gradle
	build.gradle
	settings.gradle
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

2.引入依赖

    implementation 'org.springframework.boot:spring-boot-starter-amqp'
  • 1

3.配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

二、MQ简单模式

在这里插入图片描述

1.publisher代码

  • RabbitMQConfig 配置文件
package com.example.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
public class RabbitMQConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Autowired
    private RabbitProperties properties;


    // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(rabbitConnectionFactory);

        // 并发消费者数量
        containerFactory.setConcurrentConsumers(1);
        containerFactory.setMaxConcurrentConsumers(20);
        // 预加载消息数量 -- QOS
        containerFactory.setPrefetchCount(1);
        // 应答模式(此处设置为手动)
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //消息序列化方式
        containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 设置通知调用链 (这里设置的是重试机制的调用链)
        containerFactory.setAdviceChain(
                RetryInterceptorBuilder
                        .stateless()
                        .recoverer(new RejectAndDontRequeueRecoverer())
                        .retryOperations(rabbitRetryTemplate())
                        .build()
        );
        return containerFactory;
    }

    // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
        //默认是用jdk序列化
        //数据转换为json存入消息队列,方便可视化界面查看消息数据
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        //此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
        rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
        //CorrelationData correlationData, boolean b, String s
        rabbitTemplate.setConfirmCallback(
                (correlationData, b, s) -> {
                    LOGGER.info("ConfirmCallback     相关数据:{}", correlationData);
                    LOGGER.info("ConfirmCallback     确认情况:{}", b);
                    LOGGER.info("ConfirmCallback     原因:{}", s);
                });
        rabbitTemplate.setReturnsCallback((message) -> {
            LOGGER.info("ReturnCallback:     消息:{}", message);
        });

        return rabbitTemplate;
    }

    //重试的Template
    @Bean
    public RetryTemplate rabbitRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        // 设置监听  调用重试处理过程
        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                // 执行之前调用 (返回false时会终止执行)
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                // 重试结束的时候调用 (最后一次重试 )
                LOGGER.info("---------------最后一次调用");

                return ;
            }
            @Override
            public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                //  异常 都会调用
                LOGGER.error("-----第{}次调用",retryContext.getRetryCount());
            }
        });
        retryTemplate.setBackOffPolicy(backOffPolicyByProperties());
        retryTemplate.setRetryPolicy(retryPolicyByProperties());
        return retryTemplate;
    }

    @Bean
    public ExponentialBackOffPolicy backOffPolicyByProperties() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
        long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
        double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
        // 重试间隔
        backOffPolicy.setInitialInterval(initialInterval * 1000);
        // 重试最大间隔
        backOffPolicy.setMaxInterval(maxInterval * 1000);
        // 重试间隔乘法策略
        backOffPolicy.setMultiplier(multiplier);
        return backOffPolicy;
    }

    @Bean
    public SimpleRetryPolicy retryPolicyByProperties() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
        retryPolicy.setMaxAttempts(maxAttempts);
        return retryPolicy;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • PublisherDemo编写发送的消息内容,这个时因为已经在页面创建好队列了RabbitMQ介绍及简单操作,所以没有在代码中生成队列,交换机等。
package com.example.publisher;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class PublisherDemo {

    @Autowired
    private RabbitTemplate template;

    @Scheduled(cron="10 * * * * ? ")
    public void send() {
        String message = "Hello World!";
        this.template.convertAndSend("test-queue", message);
        System.out.println(" PublisherDemo Send '" + message + "'");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 输出
2024-01-31T14:13:10.021+08:00  INFO 26676 --- [   scheduling-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2024-01-31T14:13:10.066+08:00  INFO 26676 --- [   scheduling-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#188598ad:0/SimpleConnection@61416cb3 [delegate=amqp://admin@127.0.0.1:5672/, localPort=64067]
2024-01-31T14:13:10.090+08:00  INFO 26676 --- [   scheduling-1] com.example.publisher.RabbitMQConfig     : ---------------最后一次调用
 PublisherDemo Send 'Hello World!'
2024-01-31T14:14:10.011+08:00  INFO 26676 --- [   scheduling-1] com.example.publisher.RabbitMQConfig     : ---------------最后一次调用
 PublisherDemo Send 'Hello World!'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.consumer代码

  • ConsumerDemo消费消息,队列要与发送消息的队列保持一致
package com.example.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerDemo {

    @RabbitListener(queues = "test-queue")
    @RabbitHandler
    public void receive(String in) {
        System.out.println(" ConsumerDemo Received '" + in + "'");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 输出:"测试queue"这个是之前在页面当中操作发送的消息;另外两个是上面代码发送的消息,都有收到。
2024-01-31T14:24:34.067+08:00  INFO 22288 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#11d4dbd6:0/SimpleConnection@277bf091 [delegate=amqp://admin@127.0.0.1:5672/, localPort=64963]
2024-01-31T14:24:34.106+08:00  INFO 22288 --- [           main] c.example.consumer.ConsumerApplication   : Started ConsumerApplication in 1.235 seconds (process running for 1.874)
 ConsumerDemo Received '测试queue'
 ConsumerDemo Received '"Hello World!"'
 ConsumerDemo Received '"Hello World!"'
  • 1
  • 2
  • 3
  • 4
  • 5

三、总结

RabbitMQ 是一个开源的消息中间件,它实现了 AMQP(高级消息队列协议)标准。作为一个消息中间件,RabbitMQ 提供了可靠的消息传递机制,用于在分布式系统中进行异步通信。

以下是 RabbitMQ 的一些关键概念和特性:

  1. 消息队列:RabbitMQ 使用消息队列来存储和转发消息。生产者将消息发送到队列,而消费者从队列中接收并处理这些消息。

  2. 发布-订阅模式:RabbitMQ 支持发布-订阅模式,其中一个生产者可以将消息发送到多个消费者。

  3. 交换机:用于接收生产者发送的消息,并根据路由规则将其路由到一个或多个队列中。

  4. 队列:消息在队列中等待被消费。每个消息都有一个关联的路由键,用于将消息路由到特定的队列。

  5. 绑定:用于将交换机和队列关联起来,定义了消息如何从交换机路由到队列。

  6. 消费者确认机制:RabbitMQ 提供了消费者确认机制,确保消息被成功消费后再从队列中移除。

  7. 持久化:RabbitMQ 允许将消息和队列进行持久化,以确保在服务器重启时消息不会丢失。

RabbitMQ 的优点包括高可靠性、灵活的路由规则、可扩展性和丰富的客户端库支持。它被广泛应用于分布式系统、微服务架构和异步任务处理等场景中,用于解耦和提高系统的可靠性和可伸缩性。

构建项目的简单步骤总结如下:

  1. docker构建rabbitmq容器,启动时设置用户名、密码、端口号
  2. 访问rabbitmq,创建交换机、队列
  3. 创建项目,引入依赖
  4. 配置文件增加容器的mq信息
  5. 编写publisher发送消息
  6. 编写consumer消费消息

注意:队列名称一定要保证消费者、生产者发送和消费的队列是同一个。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/688465
推荐阅读
相关标签
  

闽ICP备14008679号