赞
踩
缓存库存所引起的事务不一致的问题, 使用了异步化的事务型消息解决了最终一致性的问题。 同时引入库存售馨这样的方案解决过载击穿的问题。
交易系统性能瓶颈
方案一:
异步同步数据库:
(1) 活动发布同步缓存库存
(2) 下单交易减缓存库存
(3) 异步消息扣减数据库存(消息队列 mq 技术 )
应用场景:
简介:
消息队列现在逐渐成为互联网企业的核心技术手段, 它由低耦合, 可靠的传递。 广播。 流量控制, 最终一致性等一系列功能。
Mq 老牌 消息中间件
ActiveMQ: 优点 安装方便遵循了jms规范。 缺点 有可能会丢失数据。 5.0xxx版本不玩 AMQP
RabbitMQ: 优点 使用erlang 天生并发性。 最初是在金融行业由稳定的和安全的保证 缺点: erlang 语言难度系数比较大 。 不支持动态拓展。
RcketMQ:优点 节点简单易用,
kafka: 依赖与zookeeper .可以动态的拓展节点。 高可用,高吞吐量 , 无限扩容, 消息的可指定追溯 ,严格顺序机制,不支持标准的消息协议。 不利于平台迁移。
总结:
一般业务 系统是不是要用mq, 最早的我们都知道是activemq,但是现在用的不多了。 并且社区不维护 。。没有经过大规模吞吐量的测试和验证。
后来我们大家就开始Rabbitmq, 确实erlang语言阻止了大量的java工程师去深入研究和掌握。
不过现在用的越来越多的公司。 是RocketMQ,确实不错。有可能社会会黄。。。
我们在一个中小型公司,技术实力一般,技术挑战不高, 我们就用rabitmq,
如果我们是一个大型公司有基础框架研发实力,那就用rocketmq
如果是大数据领域,实时计算 。 日志采集 系统等场景。用标准kafka
项目启动时 初始化 库存 到redis
实现接口 重写方法 implements InitializingBean
@Override
public void afterPropertiesSet() throws Exception {
//查数据库
List<MiaoshaGoods> all = miaosha.findAll();
for (MiaoshaGoods miaoshaGoods : all) {
//添加到redis缓存miaoshakucun:+id格式
stringRedisTemplate.opsForValue().set(MiaoShaGoodKey.MiaoshaGoodskucun.getPrefix() + miaoshaGoods.getId(), String.valueOf(miaoshaGoods.getStockCount()));
localmap.put(miaoshaGoods.getGoodsId(), false);
}
}
简单队列
连接代码封装:
package com.etc.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * @Date 2020/8/6 14:10 **/ public class MQConnectionUtils { public static Connection newConnection() throws IOException, TimeoutException { // 1. 定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2. 设置rabbitmq 服务 factory.setHost("127.0.0.1"); // 3. 设置协议端口号 factory.setPort(5672); // 4. 设置用户名 factory.setUsername("wcc"); // 5. 设置密码 factory.setPassword("wcc"); // 6. 设置vhost factory.setVirtualHost("/test"); Connection connection = factory.newConnection(); return connection; } }
消息生产者:
package com.etc.utils.simplequeue; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description 生产者 * @Date 2020/8/6 14:16 **/ public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1. 获得连接 Connection connection = MQConnectionUtils.newConnection(); // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建队列声明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 4. 发送消息 String msg = "我是生产者生产得消息"; System.out.println(msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); channel.close(); // 5. 关闭队列 connection.close(); } }
消息消费者:
package com.etc.utils.simplequeue; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description 消费者 * @Date 2020/8/6 14:25 **/ public class Customer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("我是消费者"); // 1. 获取连接 Connection connection = MQConnectionUtils.newConnection(); // 2. 获取通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者获取到得消息:" + msg); } }; // 3. 监听队列 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
总结:
简单队列也称为点对点。 即一个生产者对应一个消费者。生产者发送消息到队列,消费者消费消息出队列。
用来将耗时得任务进行分发给多个消费者;
主要解决的这种问题: 处理资源密集任务。 并且还要等他完成。
工作队列也叫 公平性队列 。 怎么个说法?
循环分发,假如我们有2个消费者默认情况下。rabbitmq 将按照顺序将每条消息发送给下一个消费者。 平均而言。 每个消费者将获得相同数量得消息。 这个分发得消息我们叫 轮询。
公平分发
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
消息持久化
参数配置一 生产者创建队列声明时,修改第二个参数
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
参数配置二: 生产者发送消息时。 修改第三个参数
for (int i = 0; i <= 50 ; i++) {
String msg = "我是生产者生产得消息" + i;
System.out.println(msg);
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
}
工作队列总结:
循环发送
: 消费者端在信道打开一个消息应答机制。 并确保返回接受到消息得确认信息,这样可以保证消费者发生故障也不会丢失信息。消息持久化
:服务器和客户端都要指定队列持久化信息和信息持久化。 保证在重启rabbitmq 队列 消息不会丢失。公平发放
:指定得消费者接受得消息个数。 避免不限消息均匀推送资源不合理得问题。总结:
相对于工作模式。 发布订阅引入交换机得概念,相对于类型上更加广泛一些。
生产者不是直接操作队列。 数据不是发给队列得。 有交换机将数据发送给之绑定得队列。
必须声明交换机 并且声明交换机得类型channel.exchangeDeclare(EXCHANGENAME,“fanout”);
* type类型
* 1. direct : 直连得方式
* 2. fanout: 广播方式
* 3. headers: 请求头方式
* 4. topic: 主题方式
队列必须要绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGENAME,"");
路由模式和发布订阅得模式类似,在订阅得模型得基础之上添加了类型,订阅模式是分发到所有绑定得交换机得队列。
生产者
package com.etc.utils.routing; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * @Date 2020/8/8 8:54 **/ public class ProdecerRouting { private static final String EXCHANGE_NAME = "my_fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 1. 获得连接 Connection connection = MQConnectionUtils.newConnection(); // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建队列声明 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 4. 发送消息 String message = "",sendType = ""; for (int i = 0; i < 10; i++) { if ( i%2 == 0){ sendType = "info"; message = "我是一个info级别的日志:" + sendType; }else { sendType = "error"; message = "我是一个error级别的日志:" + sendType; } System.out.println("[send]:" + message + " " + sendType ); channel.basicPublish(EXCHANGE_NAME,sendType,null,message.getBytes("utf-8")); Thread.sleep(5 * i); } channel.close(); connection.close(); } }
消费者:
package com.etc.utils.routing; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * @Date 2020/8/8 9:01 **/ public class ConsomerInfo { private static final String QUEUE_NAME = "consumer_info"; private static final String EXCHANGE_NAME = "my_fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("我是info得消费"); // 1. 获得连接 Connection connection = MQConnectionUtils.newConnection(); // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 消费关联队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("消费者获取生产数据:" + msg); } }; //5. 消费者监听 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
消费者:
package com.etc.utils.routing; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * @Date 2020/8/8 9:05 **/ public class ConsomerError { private static final String QUEUE_NAME = "consumer_error"; private static final String EXCHANGE_NAME = "my_fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("我是error得消费"); // 1. 获得连接 Connection connection = MQConnectionUtils.newConnection(); // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 消费关联队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("消费者获取生产数据:" + msg); } }; //5. 消费者监听 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
总结:
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
主题模式 topic 模式 和 路由模式 类似, 只不过路由模式是指定固定得路由键 routingkey, 而主题模式是可以模糊匹配路由键 routingkey, 类似于sql = 和 like 得关系。
topic必须由一个英文句号 “.” 分割字符, 我们将被句号 . 分隔开得每一段独立得字符穿称为一个单词,比如说"lazy.orange.fox"
这个topic rountingkey 规则中。 额可以存在2个特殊字符 与 # 用作于模糊匹配 匹配多个单词可以是零个。
规则:
“ * ” 表示任何一个词
“ # ” 表示0 或者1个词
生产者;
package com.etc.utils.topic; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * @Date 2020/8/8 9:23 **/ public class ProducerTopic { private static final String EXCHANGE_NAME = "my_topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { /** 1.创建新的连接 */ Connection connection = MQConnectionUtils.newConnection(); /** 2.创建通道 */ Channel channel = connection.createChannel(); /** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */ channel.exchangeDeclare(EXCHANGE_NAME, "topic"); /** 4.发送消息 */ String routingKey = "log.info.error"; String msg = "topic_exchange_msg:" + routingKey; System.out.println("[send] = " + msg); channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); /** 5.关闭通道、连接 */ channel.close(); connection.close(); } }
消费者:
package com.etc.utils.topic; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * 消费 * @Date 2020/8/8 9:25 **/ public class ConsumerLogXTopic { private static final String QUEUE_NAME = "topic_consumer_info"; private static final String EXCHANGE_NAME = "my_topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("log * 消费者启动"); /* 1.创建新的连接 */ Connection connection = MQConnectionUtils.newConnection(); /* 2.创建通道 */ Channel channel = connection.createChannel(); /* 3.消费者关联队列 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; /* 5.消费者监听队列消息 */ channel.basicConsume(QUEUE_NAME, true, consumer); } }
消费者:
package com.etc.utils.topic; import com.etc.utils.MQConnectionUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @Author kalista * @Description * @Date 2020/8/8 9:27 **/ public class ConsumerLogJTopic { private static final String QUEUE_NAME = "topic_consumer_info"; private static final String EXCHANGE_NAME = "my_topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("log # 消费者启动"); /* 1.创建新的连接 */ Connection connection = MQConnectionUtils.newConnection(); /* 2.创建通道 */ Channel channel = connection.createChannel(); /* 3.消费者关联队列 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.info.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; /* 5.消费者监听队列消息 */ channel.basicConsume(QUEUE_NAME, true, consumer); } }
总结:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#确认消息已发送到交换机(Exchange)
#spring.rabbitmq.publisher-confirm-type=correlated
#确认消息已发送到队列(Queue)
#spring.rabbitmq.publisher-returns=true
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。