当前位置:   article > 正文

【Java秒杀方案】4. rabbitMQ 3.8.5安装 erlang_23 jdk1.8 centos7 Springboot 集成 starter Exchange RabbitMQConfig_rabbitmq3.8.5

rabbitmq3.8.5

RabbitMQ安装

官方教程 getstarted

https://www.rabbitmq.com/getstarted.html
在这里插入图片描述
Routing Receiving messages selectively
在这里插入图片描述
Topics Receiving messages based on a pattern (topics)
在这里插入图片描述

RPC Request/reply pattern
在这里插入图片描述

1.在线安装依赖环境:

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
  • 1

2.安装Erlang

官网提示:https://www.erlang-solutions.com/resources/download.html

2.1esl-erlang_23.0.2-1_centos_7_amd64.rpm 下载

链接:https://pan.baidu.com/s/1w3zpv8iR1E5bs7m9EogYig
提取码:orqm

2.2 安装erlang

yum -y install esl-erlang_23.0.2-1_centos_7_amd64.rpm
  • 1

2.3 检测erlang

erl
  • 1
[root@seckill rabbitmq]# erl
Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]

Eshell V11.0.2  (abort with ^G)
  • 1
  • 2
  • 3
  • 4

3. 安装RabbitMQ

官网下载地址:http://www.rabbitmq.com/download.html

3.1 下载

链接:https://pan.baidu.com/s/1MZUispijy1F0Gh-EXJUrVA
提取码:5qdp

3.2 安装rabbitmq

yum -y install rabbitmq-server-3.8.5-1.el7.noarch.rpm
  • 1

3.3 安装UI插件

rabbitmq-plugins enable rabbitmq_management

3.4 启用rabbitmq服务

systemctl start rabbitmq-server.service

3.5 安装UI插件时报错异常解决

错误

{:query, :rabbit@seckill, {:badrpc, :timeout}}
  • 1

方法:需要设置自己的hostname

[root@seckill rabbitmq]# hostnamectl
   Static hostname: seckill
         Icon name: computer-vm
		 
[root@seckill rabbitmq]# vi /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.25.130 seckill
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3.6 检测服务

systemctl status rabbitmq-server.service

3.7 web访问

guest/guest 用户默认只可以localhost(本机)访问

localhost:15672

  • 1
  • 2

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

3.8 安装路径 whereis rabbitmq

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.5
  • 1

3.9 默认配置文件 guest/guest 远程访问授权

/etc/rabbitmq/rabbitmq.config
在rabbitmq的配置文件目录下(默认为:/etc/rabbitmq)创建一个rabbitmq.config文件。
文件中添加如下配置(请不要忘记那个“.”):

[{rabbit, [{loopback_users, []}]}].
  • 1

重启rabbitmq服务

重新访问

4.springboot 集成使用

4.1 依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

4.2 RabbitMQConfig.java com.example.miaosha.config

4.2.1 测试 fanoutExchange

广播 全部queue都收到
在这里插入图片描述

4.2.2 测试 direct_Exchange ROUTINGKEY01(queue.red)

依据绑定的路由key,转到到指定queue

在这里插入图片描述

4.2.3 测试 topic_Exchange TOPICROUTINGKEY01("#.queue.#" “.queue.”)

依据路由规则,*,#通配符 *表示 1各或者多个 *表示0次或多次
在这里插入图片描述

4.2.4 测试 head_Exchange

在这里插入图片描述

匹配map中的值 match

4.2.5 RabbitMQConfigTest .java com.example.miaosha.config
package com.example.miaosha.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfigTest {
    private static final  String QUEUE01="queue_fanout01";
    private static final  String QUEUE02="queue_fanout02";
    private static final  String EXCHANGE="fanoutExchange";

    private static final  String DIRCTQUEUE01="direct_queue01";
    private static final  String DIRCTQUEUE02="direct_queue02";
    private static final  String DIRCTEXCHANGE="direct_Exchange";
    private static final  String ROUTINGKEY01="queue.red";
    private static final  String ROUTINGKEY02="queue.green";

    private static final  String TOPICQUEUE01="topic_queue01";
    private static final  String TOPICQUEUE02="topic_queue02";
    private static final  String TOPICEXCHANGE="topic_Exchange";
    private static final  String TOPICROUTINGKEY01="#.queue.#";
    private static final  String TOPICROUTINGKEY02="*.queue.*";

    private static final  String HEADQUEUE01="head_queue01";
    private static final  String HEADQUEUE02="head_queue02";
    private static final  String HEADEXCHANGE="head_Exchange";


    @Bean
    //生产者 消费者
    public Queue queue(){
        return new Queue("q1",true);
    }

    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }

    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }

    @Bean
    public Binding Binding01(){
        return BindingBuilder.bind(queue01()).to(fanoutExchange());
    }

    @Bean
    public Binding Binding02(){
        return BindingBuilder.bind(queue02()).to(fanoutExchange());
    }


    @Bean
    public Queue directQueue01(){
        return new Queue(DIRCTQUEUE01);
    }

    @Bean
    public Queue directQueue02(){
        return new Queue(DIRCTQUEUE02);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRCTEXCHANGE);
    }

    @Bean
    public Binding directBinding01(){
        return BindingBuilder.bind(directQueue01()).to(directExchange()).with(ROUTINGKEY01);
    }

    @Bean
    public Binding directBinding02(){
        return BindingBuilder.bind(directQueue02()).to(directExchange()).with(ROUTINGKEY02);
    }


    @Bean
    public Queue topicQueue01(){
        return new Queue(TOPICQUEUE01);
    }

    @Bean
    public Queue topicQueue02(){
        return new Queue(TOPICQUEUE02);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPICEXCHANGE);
    }

    @Bean
    public Binding topicBinding01(){
        return BindingBuilder.bind(topicQueue01()).to(topicExchange()).with(TOPICROUTINGKEY01);
    }

    @Bean
    public Binding topicBinding02(){
        return BindingBuilder.bind(topicQueue02()).to(topicExchange()).with(TOPICROUTINGKEY02);
    }


    @Bean
    public Queue headQueue01(){
        return new Queue(HEADQUEUE01);
    }

    @Bean
    public Queue headQueue02(){
        return new Queue(HEADQUEUE02);
    }

    @Bean
    public HeadersExchange headExchange(){
        return new HeadersExchange(HEADEXCHANGE);
    }

    @Bean
    public Binding headBinding01(){
        Map<String,Object> map=new HashMap<>();
        map.put("color","red");
        map.put("speed","low");
        return BindingBuilder.bind(headQueue01()).to(headExchange()).whereAll(map).match();
    }

    @Bean
    public Binding headBinding02(){
        Map<String,Object> map=new HashMap<>();
        map.put("color","red");
        map.put("speed","fast");
        return BindingBuilder.bind(headQueue02()).to(headExchange()).whereAll(map).match();
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
4.2.5 RabbitMQConfig .java com.example.miaosha.config

实际秒杀用到的TopicExchange配置

package com.example.miaosha.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    private static final  String QUEUE="seckillQueue";
    private static final  String EXCHANGE="seckillExchange";
    private static final  String ROUTINGKEY="seckill.#";



    @Bean
    public Queue seckillQueue(){
        return new Queue(QUEUE);
    }


    @Bean
    public TopicExchange seckillExchange(){
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    public Binding binding(){
        return BindingBuilder.bind(seckillQueue()).to(seckillExchange()).with(ROUTINGKEY);
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

4.3 MQReceiver .java com.example.miaosha.rabbitmq

通过 @RabbitListener(queues = “seckillQueue”) 绑定queue
通过fastjson 把 String message 转为SeckillMessage对象使用

package com.example.miaosha.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.example.miaosha.pojo.SeckillMessage;
import com.example.miaosha.pojo.SeckillOrder;
import com.example.miaosha.pojo.User;
import com.example.miaosha.service.IGoodsService;
import com.example.miaosha.service.IOrderService;
import com.example.miaosha.utils.JsonUtil;
import com.example.miaosha.vo.GoodsVo;
import com.example.miaosha.vo.RespBean;
import com.example.miaosha.vo.RespBeanEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestMapping;

@Service
@Slf4j
public class MQReceiver {

    @Autowired
    private IGoodsService goodsService;
    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private IOrderService orderService;

    @RabbitListener(queues = "seckillQueue")
    public void receive(String message){
        log.info("seckillQueue接受消息:"+message);
        SeckillMessage seckillMessage = JSON.parseObject(message, SeckillMessage.class);
        Long goodsId = seckillMessage.getGoodsId();
        User user = seckillMessage.getUser();
        //下单操作
        GoodsVo goodsVo = goodsService.findGoodsVoById(goodsId);
        if(goodsVo.getStockCount()<1){
            //如果有key,表示,没有库存,结束
            redisTemplate.opsForValue().set("isStockEmpty:"+goodsId,"0");
            return;
        }

        //1. 判断是否重复抢购
        ValueOperations valueOperations = redisTemplate.opsForValue();
        SeckillOrder seckillOrder = (SeckillOrder)valueOperations.get("order:" + user.getId() + ":" + goodsId);

        if(null!=seckillOrder){
            return;
        }
        //重复抢购
        orderService.seckill(goodsVo,user);

    }


    /*
    @RabbitListener(queues = "q1")
    public void receive(Object msg){
        log.info("接受消息:"+msg);
    }

    @RabbitListener(queues = "queue_fanout01")
    public void receive1(Object msg){
        log.info("queue_fanout01接受消息:"+msg);
    }

    @RabbitListener(queues = "queue_fanout02")
    public void receive2(Object msg){
        log.info("queue_fanout02接受消息:"+msg);
    }

    @RabbitListener(queues = "direct_queue01")
    public void receive3(Object msg){
        log.info("direct_queue01接受消息:"+msg);
    }

    @RabbitListener(queues = "direct_queue02")
    public void receive4(Object msg){
        log.info("direct_queue02接受消息:"+msg);
    }

    @RabbitListener(queues = "topic_queue01")
    public void receive5(Message msg){
        log.info("topic_queue01接受消息:"+msg);
    }

    @RabbitListener(queues = "topic_queue02")
    public void receive6(Message msg){
        log.info("topic_queue02接受消息:"+msg);
    }


    @RabbitListener(queues = "head_queue01")
    public void receive7(Message msg){
        log.info("head_queue01接受消息:"+new String(msg.getBody()));
    }
    @RabbitListener(queues = "head_queue02")
    public void receive8(Message msg){
        log.info("head_queue02接受消息:"+new String(msg.getBody()));
    }
    */

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109

4.4 MQSender.java com.example.miaosha.rabbitmq

注入 rabbitTemplate
exchangename:seckillExchange
routerkey:seckill.message
message string
rabbitTemplate.convertAndSend(“seckillExchange”,“seckill.message”,message);

package com.example.miaosha.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendSeckillMessage(String message){
        log.info("sendSeckill发送消息:"+message);
        rabbitTemplate.convertAndSend("seckillExchange","seckill.message",message);
    }

    /*

    public void send(Object msg){
        log.info("发送消息:"+msg);
        rabbitTemplate.convertAndSend("q1",msg);
    }

    public void sendFanout(Object msg){
        log.info("fanout发送消息:"+msg);
        rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    }

    public void sendDirect01(Object msg){
        log.info("directExchange发送消息:"+msg);
        rabbitTemplate.convertAndSend("direct_Exchange","queue.red",msg);
    }

    public void sendDirect02(Object msg){
        log.info("directExchange发送消息:"+msg);
        rabbitTemplate.convertAndSend("direct_Exchange","queue.green",msg);
    }

    public void sendTopic01(Object msg){
        log.info("topic_Exchange发送消息:"+msg);
        rabbitTemplate.convertAndSend("topic_Exchange","aaa.queue.bbb",msg);
    }

    public void sendTopic02(Object msg){
        log.info("topic_Exchange发送消息:"+msg);
        rabbitTemplate.convertAndSend("topic_Exchange","queue.red.message",msg);
    }

    public void sendHead01(String msg){
        log.info("topic_Exchange发送消息:"+msg);
        MessageProperties properties=new MessageProperties();
        properties.setHeader("color","red");
        properties.setHeader("speed","low");
        Message message=new Message(msg.getBytes(),properties);
        rabbitTemplate.convertAndSend("head_Exchange","",message);
    }

    public void sendHead02(String msg){
        log.info("topic_Exchange发送消息:"+msg);
        MessageProperties properties=new MessageProperties();
        properties.setHeader("color","red");
        properties.setHeader("speed","fast");
        Message message=new Message(msg.getBytes(),properties);
        rabbitTemplate.convertAndSend("head_Exchange","",message);
    }
    */


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

4.5 SeckillController 发送 SeckillMessage

@Controller
@Slf4j
@RequestMapping("/seckill")
public class SeckillController implements InitializingBean {
@Autowired
private MQSender mqSender;

 //秒杀静态化,在商品详情页,直接ajax请求秒杀,成功后,跳转秒杀成功静态页面
    @RequestMapping(value = "/{path}/doSeckill",method = RequestMethod.POST)
    @ResponseBody
    public RespBean doSeckill(Model model, User user,Long goodsId,@PathVariable String path) {
		//下单
        SeckillMessage seckillMessage = new SeckillMessage(user, goodsId);

        mqSender.sendSeckillMessage(JSON.toJSONString(seckillMessage));

        //正在排队中 0
	    return RespBean.success(0);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

4.6 SeckillMessage.java com.example.miaosha.pojo

消息对象

package com.example.miaosha.pojo;

import com.example.miaosha.vo.GoodsVo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SeckillMessage {
    private User user;
    private Long goodsId;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

4.7发送消息控制器 UserController /user/mq/topic01

package com.example.miaosha.controller;


import com.example.miaosha.pojo.User;
import com.example.miaosha.rabbitmq.MQSender;
import com.example.miaosha.vo.RespBean;
import com.example.miaosha.vo.RespBeanEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * <p>
 *  前端控制器
 * </p>
 *
 * @author cch
 * @since 2021-11-14
 */
@Controller
@RequestMapping("/user")
public class UserController {
    @Autowired
    private MQSender mqSender;


    @RequestMapping("/info")
    @ResponseBody
    public RespBean info(User user){
        return RespBean.success(user);
    }
    
    @RequestMapping("/mq")
    @ResponseBody
    public void mq(){
        mqSender.send("hello");
    }

    @RequestMapping("/mq/fanout")
    @ResponseBody
    public void fanout(){
        mqSender.sendFanout("hello");
    }

    @RequestMapping("/mq/direct01")
    @ResponseBody
    public void direct01(){
        mqSender.sendDirect01("hello");
    }

    @RequestMapping("/mq/direct02")
    @ResponseBody
    public void direct02(){
        mqSender.sendDirect02("hello");
    }

    @RequestMapping("/mq/topic01")
    @ResponseBody
    public void topic01(){
        mqSender.sendTopic01("hello");
    }


    @RequestMapping("/mq/topic02")
    @ResponseBody
    public void topic02(){
        mqSender.sendTopic02("hello");
    }

    @RequestMapping("/mq/header01")
    @ResponseBody
    public void head01(){
        mqSender.sendHead01("hello red low");
    }

    @RequestMapping("/mq/header02")
    @ResponseBody
    public void head02(){
        mqSender.sendHead02("hello red fast");
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/568754
推荐阅读
相关标签
  

闽ICP备14008679号