赞
踩
将quartz引入电商项目中
添加依赖
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
添加配置文件applicationContext-scheduler.xml , 与spring框架整合
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd"> <!-- 定义任务bean --> <bean name="paymentOrderJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> <!-- 指定具体的job类 --> <property name="jobClass" value="com.jt.store.order.job.PaymentOrderJob" /> <!-- 指定job的名称 --> <property name="name" value="paymentOrder" /> <!-- 指定job的分组 --> <property name="group" value="Order" /> <!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中删除该任务 --> <property name="durability" value="true"/> <!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 --> <property name="applicationContextJobDataKey" value="applicationContext"/> </bean> <!-- 定义触发器 --> <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="paymentOrderJobDetail" /> <!-- 每一分钟执行一次 --> <property name="cronExpression" value="0 0/1 * * * ?" /> </bean> <!-- 定义调度器 --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="cronTrigger" /> </list> </property> </bean> </beans>
自定义一个job执行类 , 需要继承石英钟的父类 , 编写完成之后需要修改配置文件中的自定义job类的全类名
public class PayMentOrderJob extends QuartzJobBean{ @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { //通过加载文件获取spring的环境信息 ApplicationContext applicationContext= (ApplicationContext) context.getJobDetail().getJobDataMap().get("applicationContext"); //获取对应mapper中的bean,来执行对应的sql语句 //传递的参数就是过期时间;需要你在代码中计算过期时间吧? //执行自己需要执行的逻辑 applicationContext.getBean(OrderMapper.class).paymentOrderScan(new DateTime().minusDays(1).toDate()); //System.out.println("hahahahahahah"); } }
rabbitmq的安装与使用
修改配置文件vim /etc/rabbitmq/rabbitmq.config P64行
%% {loopback_users, []},
修改1:去掉前面的两个%%,
修改2:去掉最后面的逗号,保存。
开启15672端口和5672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 –j ACCEPT #控制台端口 浏览器访问端口
/sbin/iptables -I INPUT -p tcp --dport 5672 –j ACCEPT #程序访问端口
保存防火墙修改操作
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables status
开启rabbitmq服务
service rabbitmq-server start 开启
service rabbitmq-server stop 关闭
service rabbitmq-server restart 重启
rabbitmq示例代码
定义rabbitmq工具类 public class ConnectionUtils { public static Connection getConnettion() throws IOException{ ConnectionFactory factory= new ConnectionFactory(); //需要连接信息; factory.setHost("106.75.74.254"); factory.setPort(5672); factory.setVirtualHost("/jt"); factory.setUsername("jtadmin"); factory.setPassword("123456"); Connection cn = factory.newConnection(); return cn; } } simple模式 public class RabbitMQTest { @Test public void testSend() throws Exception{ /* * 1 创建连接工厂 * 2 从工厂穿件connection * 3 从连接获取channel * 4 从channel声明绑定queue * 5 生产者生产消息放入队列 * 6 释放资源 */ ConnectionFactory factory= new ConnectionFactory(); //需要连接信息; factory.setHost("106.75.74.254"); factory.setPort(5672); factory.setVirtualHost("/jt"); factory.setUsername("jtadmin"); factory.setPassword("123456"); Connection cn = factory.newConnection(); Channel chan = cn.createChannel(); String QUEUE_NAME="simple"; chan.queueDeclare(QUEUE_NAME, false, false, false, null); //第二个参数是否持久化,第三个参数是否独占queue,第四个参数不使用达到一定时间是否自动删除,第五个参数 //其他参数; String msg="hello simple mode11111111";//模拟传送到mq中的消息 //把消息发送到queue chan.basicPublish("", QUEUE_NAME, null, msg.getBytes()); //第一个参数与路由有关 chan.close(); cn.close(); } @Test//消息消费端 public void testRec() throws Exception{ /* * 1 创建连接工厂 * 2 从工厂穿件connection * 3 从连接获取channel * 4 从channel声明绑定queue *5 消费者消费消息 * 6 释放资源 */ ConnectionFactory factory= new ConnectionFactory(); //需要连接信息; factory.setHost("106.75.74.254"); factory.setPort(5672); factory.setVirtualHost("/jt"); factory.setUsername("jtadmin"); factory.setPassword("123456"); Connection cn = factory.newConnection(); Channel chan = cn.createChannel(); String QUEUE_NAME="simple"; chan.queueDeclare(QUEUE_NAME, false, false, false, null); //创建一个consumer QueueingConsumer consumer=new QueueingConsumer(chan); chan.basicConsume(QUEUE_NAME, true,consumer); while(true){ Delivery delivery=consumer.nextDelivery(); String msg=new String(delivery.getBody()); System.out.println(msg); } } } work模式 public class WorkTest { @Test public void testSend() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.queueDeclare("work111", false, false, false, null); //for循环发送消息 for(int i=0;i<100;i++){ String msg="work"+i; chan.basicPublish("", "work111", null, msg.getBytes()); System.out.println("发送消息第"+i+"条"); } chan.close(); cn.close(); } @Test public void testRec1() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.queueDeclare("work111", false, false, false, null); chan.basicQos(1); QueueingConsumer consumer=new QueueingConsumer(chan); chan.basicConsume("work111", false,consumer); //获取消息 while(true){ QueueingConsumer.Delivery delivery=consumer.nextDelivery(); String msg=new String(delivery.getBody()); System.out.println("接收到消息:"+msg); //模拟机器性能 Thread.sleep(10); chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } @Test public void testRec2() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.queueDeclare("work111", false, false, false, null); chan.basicQos(1); QueueingConsumer consumer=new QueueingConsumer(chan); chan.basicConsume("work111", false,consumer); //获取消息 while(true){ QueueingConsumer.Delivery delivery=consumer.nextDelivery(); String msg=new String(delivery.getBody()); System.out.println("接收到消息:"+msg); //模拟机器性能 Thread.sleep(500); chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } publish模式 public class PublishTest { @Test public void testSend() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); //声明交换机exchange chan.exchangeDeclare("jt_publish", "fanout");//交换机的名称,和交换机的种类 for(int i=0;i<100;i++){ String msg="publish num"+i; chan.basicPublish("jt_publish", "", null, msg.getBytes()); System.out.println("发送了第"+i+"条"); } } @Test public void testRec1() throws Exception{ Connection cn= ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); //声明队列 chan.queueDeclare("publish01", false, false, false, null); //绑定交换机 chan.exchangeDeclare("jt_publish", "fanout"); //绑定队列到交换机 chan.queueBind("publish01", "jt_publish", ""); //同一时刻服务器只发送一条消息给消费者消费 chan.basicQos(1); //定义队列的消费者 QueueingConsumer cons=new QueueingConsumer(chan); chan.basicConsume("publish01", false,cons); //获取消息 while(true){ QueueingConsumer.Delivery deli=cons.nextDelivery(); String msg=new String(deli.getBody()); System.out.println("接收到消息:"+msg); chan.basicAck(deli.getEnvelope().getDeliveryTag(), false); } } @Test public void testRec2() throws Exception{ Connection cn= ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); //声明队列 chan.queueDeclare("publish02", false, false, false, null); //绑定交换机 chan.exchangeDeclare("jt_publish", "fanout"); //绑定队列到交换机 chan.queueBind("publish02", "jt_publish", ""); //同一时刻服务器只发送一条消息给消费者消费 chan.basicQos(1); //定义队列的消费者 QueueingConsumer cons=new QueueingConsumer(chan); chan.basicConsume("publish02", false,cons); //获取消息 while(true){ QueueingConsumer.Delivery deli=cons.nextDelivery(); String msg=new String(deli.getBody()); System.out.println("接收到消息:"+msg); chan.basicAck(deli.getEnvelope().getDeliveryTag(), false); } } } routing public class RoutingTest { @Test public void testSend() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); //声明交换机exchange chan.exchangeDeclare("jt_routing", "topic");//交换机的名称,和交换机的种类 //生成消息 String msg="哈哈哈哈"; String routingKey="item.add"; chan.basicPublish("jt_routing", routingKey, null, msg.getBytes()); System.out.println("发送完成"); chan.close(); cn.close(); } @Test public void testRec1() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.exchangeDeclare("jt_routing", "direct"); chan.queueDeclare("routing01", false, false, false, null); chan.queueBind("routing01", "jt_routing", "item.add"); //生成consumer QueueingConsumer cons=new QueueingConsumer(chan); chan.basicConsume("routing01", false,cons); while(true){ QueueingConsumer.Delivery deli=cons.nextDelivery(); String msg=new String(deli.getBody()); System.out.println("一号接受者接收到消息:"+msg); chan.basicAck(deli.getEnvelope().getDeliveryTag(), false); } } @Test public void testRec2() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.exchangeDeclare("jt_routing", "direct"); chan.queueDeclare("routing02", false, false, false, null); chan.queueBind("routing02", "jt_routing", "item1.update"); //生成consumer QueueingConsumer cons=new QueueingConsumer(chan); chan.basicConsume("routing02", false,cons); while(true){ QueueingConsumer.Delivery deli=cons.nextDelivery(); String msg=new String(deli.getBody()); System.out.println("二号接受者接收到消息:"+msg); chan.basicAck(deli.getEnvelope().getDeliveryTag(), false); } } } topic模式 public class RoutingTest { @Test public void testSend() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); //声明交换机exchange chan.exchangeDeclare("jt_routing", "topic");//交换机的名称,和交换机的种类 //生成消息 String msg="哈哈哈哈"; String routingKey="item.#"; chan.basicPublish("jt_routing", routingKey, null, msg.getBytes()); System.out.println("发送完成"); chan.close(); cn.close(); } @Test public void testRec1() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.exchangeDeclare("jt_routing", "direct"); chan.queueDeclare("routing01", false, false, false, null); chan.queueBind("routing01", "jt_routing", "item.add"); //生成consumer QueueingConsumer cons=new QueueingConsumer(chan); chan.basicConsume("routing01", false,cons); while(true){ QueueingConsumer.Delivery deli=cons.nextDelivery(); String msg=new String(deli.getBody()); System.out.println("一号接受者接收到消息:"+msg); chan.basicAck(deli.getEnvelope().getDeliveryTag(), false); } } @Test public void testRec2() throws Exception{ Connection cn=ConnectionUtils.getConnettion(); Channel chan=cn.createChannel(); chan.exchangeDeclare("jt_routing", "direct"); chan.queueDeclare("routing02", false, false, false, null); chan.queueBind("routing02", "jt_routing", "item1.update"); //生成consumer QueueingConsumer cons=new QueueingConsumer(chan); chan.basicConsume("routing02", false,cons); while(true){ QueueingConsumer.Delivery deli=cons.nextDelivery(); String msg=new String(deli.getBody()); System.out.println("二号接受者接收到消息:"+msg); chan.basicAck(deli.getEnvelope().getDeliveryTag(), false); } } }
秒杀业务场景:
业务场景分析: 并发量很高单例rabbitmq的并发量在百万级以上 有个商品在每天的8点开抢,每天的量5万台iphone4s 可以引入rabbitmq 在抢单的请求发起后,可以将前台的抢单信息放到rabbitmq; simple 队列 100万个人抢,写入队列中的消息可以是什么? 电话号码 ticket username 生产者,前台的controller或者service调用将请求中的信息保存到队列中; 消费者,监听队列,一旦有数据,获取数据 调用SSO查询用户信息,把前5万个消息获取到,后面的放入rabbitmq的垃圾桶; 并发更高的时候需要使用rabbitmq的分布式(峰会的题目之一)
将rabbit引入电商项目中:
示例代码
生产者端
添加rabbitmq与spring整合 的配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.vhost}" /> <!-- 定义Rabbit模板,指定连接工厂以及定义exchange --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="itemDirectExchange" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定义交换器,自动声明,durable持久化 --> <rabbit:direct-exchange name="itemDirectExchange" auto-declare="true" durable="true"> </rabbit:direct-exchange> </beans>
添加rabbitmq的配置文件rabbitmq.properties
rabbit.ip=106.75.74.254
rabbit.port=5672
rabbit.username=jtadmin
rabbit.password=123456
rabbit.vhost=/jt
在spring核心配置文件中配置读取rabbitmq配置文件的配置
<!-- 使用spring自带的占位符替换功能,可以实现注解方式获取属性文件中的配置值 --> <bean class="com.jt.common.spring.exetend.ExtendedPropertyPlaceholderConfigurer"> <!-- 允许JVM参数覆盖 --> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /> <!-- 忽略没有找到的资源文件 --> <property name="ignoreResourceNotFound" value="true" /> <!-- 配置资源文件 --> <property name="locations"> <list> <value>classpath:jdbc.properties</value> <value>classpath:env.properties</value> <value>classpath:redis.properties</value> <value>classpath:rabbitmq.properties</value> </list> </property> </bean>
在service层把需要的操作放入消息队列 (需要先注入rabbitmq模板类) 把商品ID放入到指定的item.add消息队列中 , 监听item.add消息队列消费时会做出特定的动作
@Service public class ItemService extends BaseService<Item>{ @Autowired private ItemMapper itemMapper; @Autowired private ItemDescMapper itemDescMapper; @Autowired private RedisService redisService; private static final ObjectMapper MAPPER=new ObjectMapper(); @Autowired private RabbitTemplate rabbitmq; //商品列表查询 public List<Item> queryItemList(){ List<Item> itemList=itemMapper.queryItemList(); return itemList; } public void saveItem(Item item,String desc) throws Exception { //insert 和insertSelective //insert方法在插入对象时,把对象的所有字段都插入 //insertSelective插入非NULL值的所有字段 //初始化数据没有做 status created updated item.setStatus(1); item.setCreated(new Date()); item.setUpdated(item.getCreated()); itemMapper.insert(item); //准备插入表格的对象 ItemDesc itemDesc=new ItemDesc(); //当前的itemDesc的关联ID怎么获取? itemDesc.setItemId(item.getId()); itemDesc.setCreated(new Date()); itemDesc.setUpdated(itemDesc.getCreated()); itemDesc.setItemDesc(desc); //插入对象到表格 itemDescMapper.insert(itemDesc); /*//添加新增商品的缓存写入 String key="ITEM_"+item.getId(); String json=MAPPER.writeValueAsString(item); redisService.set(key, json);*/ //写消息到消息队列 rabbitmq.convertAndSend("item.add", item.getId()); }
消费者端
引入rabbitmq与spring整合的消费者配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.vhost}" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定义消息队列 --> <rabbit:queue name="jt-web.itemQueue" auto-declare="true"/> <!-- 定义交换机,并且完成队列和交换机的绑定 --> <rabbit:direct-exchange name="itemDirectExchange" auto-declare="true"> <rabbit:bindings> <!-- 前台系统只接收商品更新的消息,key路由key --> <rabbit:binding queue="jt-web.itemQueue" key="item.add"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 定义监听 --> <rabbit:listener-container connection-factory="connectionFactory"> <!-- 监听一个队列,当队列中有消息,就会自动触发类.方法,传递消息就作为方法的参数,根据方法声明的参数强转 --> <rabbit:listener ref="rabbitItemService" method="addItem" queue-names="jt-web.itemQueue"/> </rabbit:listener-container> </beans>
引入rabbitmq的配置文件rabbitmq.properties
rabbit.ip=106.75.74.254
rabbit.port=5672
rabbit.username=jtadmin
rabbit.password=123456
rabbit.vhost=/jt
编写消费者rabbitmq
@Service
public class RabbitItemService {
@Autowired
private HttpClientService client;
@Autowired
private RedisService redis;
public void addItem(Long itemId) throws Exception{
//把消息itemId获取过来,到后台拿数据
//写redis
String url="http://manage.jt.com/item/"+itemId;
String jsonItem = client.doGet(url,"utf-8");
redis.set("ITEM_"+itemId, jsonItem);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。