当前位置:   article > 正文

RabbitMQ基本队列 交易性能优化_java rabbitmq cachingconnectionfactory

java rabbitmq cachingconnectionfactory

六、交易性能优化

缓存库存所引起的事务不一致的问题, 使用了异步化的事务型消息解决了最终一致性的问题。 同时引入库存售馨这样的方案解决过载击穿的问题。

交易系统性能瓶颈

  1. jemter压测
  2. 交易验证完全依赖数据库
  3. 库存行锁
  4. 后置处理器

方案一:

  1. 活动发布同步库存缓存
  2. 下单交易减缓存库存

异步同步数据库:

(1) 活动发布同步缓存库存

(2) 下单交易减缓存库存

(3) 异步消息扣减数据库存(消息队列 mq 技术 )

MQ概述:

应用场景:

  1. 削峰填谷
  2. 系统解耦
  3. 提示性能
  4. 蓄流压测

RabbitMq

简介:

消息队列现在逐渐成为互联网企业的核心技术手段, 它由低耦合, 可靠的传递。 广播。 流量控制, 最终一致性等一系列功能。

Mq 老牌 消息中间件

ActiveMQ: 优点 安装方便遵循了jms规范。 缺点 有可能会丢失数据。 5.0xxx版本不玩 AMQP

RabbitMQ: 优点 使用erlang 天生并发性。 最初是在金融行业由稳定的和安全的保证 缺点: erlang 语言难度系数比较大 。 不支持动态拓展。

RcketMQ:优点 节点简单易用,

kafka: 依赖与zookeeper .可以动态的拓展节点。 高可用,高吞吐量 , 无限扩容, 消息的可指定追溯 ,严格顺序机制,不支持标准的消息协议。 不利于平台迁移。

总结:

​ 一般业务 系统是不是要用mq, 最早的我们都知道是activemq,但是现在用的不多了。 并且社区不维护 。。没有经过大规模吞吐量的测试和验证。

后来我们大家就开始Rabbitmq, 确实erlang语言阻止了大量的java工程师去深入研究和掌握。

不过现在用的越来越多的公司。 是RocketMQ,确实不错。有可能社会会黄。。。

我们在一个中小型公司,技术实力一般,技术挑战不高, 我们就用rabitmq,

如果我们是一个大型公司有基础框架研发实力,那就用rocketmq

如果是大数据领域,实时计算 。 日志采集 系统等场景。用标准kafka

项目启动时 初始化 库存 到redis
实现接口 重写方法 implements InitializingBean

 @Override
    public void afterPropertiesSet() throws Exception {
        //查数据库
        List<MiaoshaGoods> all = miaosha.findAll();
        for (MiaoshaGoods miaoshaGoods : all) {
            //添加到redis缓存miaoshakucun:+id格式
            stringRedisTemplate.opsForValue().set(MiaoShaGoodKey.MiaoshaGoodskucun.getPrefix() + miaoshaGoods.getId(), String.valueOf(miaoshaGoods.getStockCount()));
            localmap.put(miaoshaGoods.getGoodsId(), false);
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

简单队列

连接代码封装:

package com.etc.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description
 * @Date 2020/8/6  14:10
 **/
public class MQConnectionUtils {



    public static Connection newConnection() throws IOException, TimeoutException {
        // 1. 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置rabbitmq 服务
        factory.setHost("127.0.0.1");
        // 3. 设置协议端口号
        factory.setPort(5672);
        // 4. 设置用户名
        factory.setUsername("wcc");
        // 5. 设置密码
        factory.setPassword("wcc");
        // 6. 设置vhost
        factory.setVirtualHost("/test");

        Connection connection = factory.newConnection();

        return connection;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

消息生产者:

package com.etc.utils.simplequeue;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description 生产者
 * @Date 2020/8/6  14:16
 **/
public class Producer {

    private static final String QUEUE_NAME = "test_queue";


    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 获得连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();
        // 3. 创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4. 发送消息
        String msg = "我是生产者生产得消息";
        System.out.println(msg);
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        channel.close();
        // 5. 关闭队列
        connection.close();
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

消息消费者:

package com.etc.utils.simplequeue;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description 消费者
 * @Date 2020/8/6  14:25
 **/
public class Customer {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("我是消费者");
        // 1. 获取连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2. 获取通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body,"utf-8");
                System.out.println("消费者获取到得消息:" + msg);
            }
        };

        // 3. 监听队列
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

总结:

简单队列也称为点对点。 即一个生产者对应一个消费者。生产者发送消息到队列,消费者消费消息出队列。

工作队列

用来将耗时得任务进行分发给多个消费者;

主要解决的这种问题: 处理资源密集任务。 并且还要等他完成。

工作队列也叫 公平性队列 。 怎么个说法?

循环分发,假如我们有2个消费者默认情况下。rabbitmq 将按照顺序将每条消息发送给下一个消费者。 平均而言。 每个消费者将获得相同数量得消息。 这个分发得消息我们叫 轮询。

公平分发

 final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
  • 1
  • 2
  • 3

消息持久化

参数配置一 生产者创建队列声明时,修改第二个参数

 channel.queueDeclare(QUEUE_NAME,true,false,false,null);
  • 1

参数配置二: 生产者发送消息时。 修改第三个参数

for (int i = 0; i <= 50 ; i++) {
            String msg = "我是生产者生产得消息" + i;
            System.out.println(msg);
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5

工作队列总结:

  1. 循环发送: 消费者端在信道打开一个消息应答机制。 并确保返回接受到消息得确认信息,这样可以保证消费者发生故障也不会丢失信息。
  2. 消息持久化:服务器和客户端都要指定队列持久化信息和信息持久化。 保证在重启rabbitmq 队列 消息不会丢失。
  3. 公平发放:指定得消费者接受得消息个数。 避免不限消息均匀推送资源不合理得问题。
发布订阅模式

总结:

相对于工作模式。 发布订阅引入交换机得概念,相对于类型上更加广泛一些。

  1. 生产者不是直接操作队列。 数据不是发给队列得。 有交换机将数据发送给之绑定得队列。

  2. 必须声明交换机 并且声明交换机得类型channel.exchangeDeclare(EXCHANGENAME,“fanout”);

    * type类型
    *  1. direct : 直连得方式
    *  2. fanout:  广播方式
    *  3. headers:  请求头方式
    *  4. topic:  主题方式
    
    • 1
    • 2
    • 3
    • 4
    • 5
  3. 队列必须要绑定交换机

   channel.queueBind(QUEUE_NAME,EXCHANGENAME,"");
  • 1
路由模式

路由模式和发布订阅得模式类似,在订阅得模型得基础之上添加了类型,订阅模式是分发到所有绑定得交换机得队列。

生产者

package com.etc.utils.routing;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description
 * @Date 2020/8/8  8:54
 **/
public class ProdecerRouting {


    private static final String EXCHANGE_NAME = "my_fanout_exchange";


    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1. 获得连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();
        // 3. 创建队列声明
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        // 4. 发送消息
        String message = "",sendType = "";
        for (int i = 0; i < 10; i++) {

            if ( i%2 == 0){
                sendType = "info";
                message = "我是一个info级别的日志:" + sendType;
            }else {
                sendType = "error";
                message = "我是一个error级别的日志:" + sendType;
            }

            System.out.println("[send]:" + message + " " + sendType );

            channel.basicPublish(EXCHANGE_NAME,sendType,null,message.getBytes("utf-8"));

            Thread.sleep(5 * i);

        }


        channel.close();
        connection.close();

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

消费者:

package com.etc.utils.routing;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description
 * @Date 2020/8/8  9:01
 **/
public class ConsomerInfo {


    private static final String QUEUE_NAME = "consumer_info";
    private static final String EXCHANGE_NAME = "my_fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("我是info得消费");
        // 1. 获得连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();
        // 3. 消费关联队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");


        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               String msg = new String(body,"UTF-8");
                System.out.println("消费者获取生产数据:" + msg);
            }
        };

        //5. 消费者监听

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

消费者:

package com.etc.utils.routing;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description
 * @Date 2020/8/8  9:05
 **/
public class ConsomerError {
    private static final String QUEUE_NAME = "consumer_error";
    private static final String EXCHANGE_NAME = "my_fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("我是error得消费");
        // 1. 获得连接
        Connection connection = MQConnectionUtils.newConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();
        // 3. 消费关联队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body,"UTF-8");
                System.out.println("消费者获取生产数据:" + msg);
            }
        };

        //5. 消费者监听

        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

总结:

  1. 2个队列消费者路由不一样,接收到得消息就不一样, 在路由得模式下。 决定消息得队列对推送得主要取决于路由而不是交换机。
  2. 该模式必须设置交换机
  channel.exchangeDeclare(EXCHANGE_NAME,"direct");
  • 1
主题模式

主题模式 topic 模式 和 路由模式 类似, 只不过路由模式是指定固定得路由键 routingkey, 而主题模式是可以模糊匹配路由键 routingkey, 类似于sql = 和 like 得关系。

topic必须由一个英文句号 “.” 分割字符, 我们将被句号 . 分隔开得每一段独立得字符穿称为一个单词,比如说"lazy.orange.fox"

这个topic rountingkey 规则中。 额可以存在2个特殊字符 与 # 用作于模糊匹配 匹配多个单词可以是零个。

规则:

“ * ” 表示任何一个词

“ # ” 表示0 或者1个词

生产者;

package com.etc.utils.topic;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author kalista
 * @Description
 * @Date 2020/8/8  9:23
 **/
public class ProducerTopic {

    private static final String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
/** 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /** 2.创建通道 */
        Channel channel = connection.createChannel();
        /** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        /** 4.发送消息 */
        String routingKey = "log.info.error";
        String msg = "topic_exchange_msg:" + routingKey;
        System.out.println("[send] = " + msg);
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        /** 5.关闭通道、连接 */
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

消费者:

package com.etc.utils.topic;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @Author kalista
 * @Description * 消费
 * @Date 2020/8/8  9:25
 **/
public class ConsumerLogXTopic {
    private static final String QUEUE_NAME = "topic_consumer_info";
    private static final String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("log * 消费者启动");
        /* 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.创建通道 */
        Channel channel = connection.createChannel();
        /* 3.消费者关联队列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        /* 5.消费者监听队列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);



    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

消费者:

package com.etc.utils.topic;

import com.etc.utils.MQConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @Author kalista
 * @Description
 * @Date 2020/8/8  9:27
 **/
public class ConsumerLogJTopic {

    private static final String QUEUE_NAME = "topic_consumer_info";
    private static final String EXCHANGE_NAME = "my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("log # 消费者启动");
        /* 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.创建通道 */
        Channel channel = connection.createChannel();
        /* 3.消费者关联队列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.info.#");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        /* 5.消费者监听队列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

总结:

  1. 在进行绑定时后。 提供一个该队列关心得主题,比如说 #.info.# 在该队列中所有设计到 info 得我们都能拿到
  2. “#” 表示0 个 或者活干个关键字。
  3. 如果exchange 没有发现能够匹配得路由 routekey 配置得队列 者会抛弃消息。
Spring boot 整合 RabbitMQ
1.添加依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4

添加配置信息

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#确认消息已发送到交换机(Exchange)
#spring.rabbitmq.publisher-confirm-type=correlated
#确认消息已发送到队列(Queue)
#spring.rabbitmq.publisher-returns=true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/139258?site
推荐阅读
相关标签
  

闽ICP备14008679号