当前位置:   article > 正文

【黑马头条之redis实现延迟任务】_redis 延迟任务

redis 延迟任务

本笔记内容为黑马头条项目的延迟任务精准发布文章部分

目录

一、实现思路

二、延迟任务服务实现

1、搭建heima-leadnews-schedule模块

2、数据库准备

3、安装redis

4、项目集成redis

5、添加任务

6、取消任务

7、消费任务

8、未来数据定时刷新

1.reids key值匹配

2.reids管道

3.未来数据定时刷新-功能完成

9、分布式锁解决集群下的方法抢占执行

1.问题描述

2.分布式锁

3.redis分布式锁

4.在工具类CacheService中添加方法

10、数据库同步到redis

三、延迟队列解决精准时间发布文章

1、延迟队列服务提供对外接口

2、发布文章集成添加延迟队列接口

3、消费任务进行审核文章


一、实现思路


问题思路

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

效率问题,算法的时间复杂度

3.在添加zset数据的时候,为什么不需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

二、延迟任务服务实现


1、搭建heima-leadnews-schedule模块

leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务

①:导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下。

如下图所示:

②:添加bootstrap.yml        

  1. server:
  2. port: 51701
  3. spring:
  4. application:
  5. name: leadnews-schedule
  6. cloud:
  7. nacos:
  8. discovery:
  9. server-addr: 192.168.200.130:8848
  10. config:
  11. server-addr: 192.168.200.130:8848
  12. file-extension: yml

③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置

  1. spring:
  2. datasource:
  3. driver-class-name: com.mysql.jdbc.Driver
  4. url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  5. username: root
  6. password: root
  7. # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
  8. mybatis-plus:
  9. mapper-locations: classpath*:mapper/*.xml
  10. # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  11. type-aliases-package: com.heima.model.schedule.pojos

2、数据库准备

导入资料中leadnews_schedule数据库

taskinfo 任务表

实体类

  1. package com.heima.model.schedule.pojos;
  2. import com.baomidou.mybatisplus.annotation.IdType;
  3. import com.baomidou.mybatisplus.annotation.TableField;
  4. import com.baomidou.mybatisplus.annotation.TableId;
  5. import com.baomidou.mybatisplus.annotation.TableName;
  6. import lombok.Data;
  7. import java.io.Serializable;
  8. import java.util.Date;
  9. /**
  10. * <p>
  11. *
  12. * </p>
  13. *
  14. * @author itheima
  15. */
  16. @Data
  17. @TableName("taskinfo")
  18. public class Taskinfo implements Serializable {
  19. private static final long serialVersionUID = 1L;
  20. /**
  21. * 任务id
  22. */
  23. @TableId(type = IdType.ID_WORKER)
  24. private Long taskId;
  25. /**
  26. * 执行时间
  27. */
  28. @TableField("execute_time")
  29. private Date executeTime;
  30. /**
  31. * 参数
  32. */
  33. @TableField("parameters")
  34. private byte[] parameters;
  35. /**
  36. * 优先级
  37. */
  38. @TableField("priority")
  39. private Integer priority;
  40. /**
  41. * 任务类型
  42. */
  43. @TableField("task_type")
  44. private Integer taskType;
  45. }

taskinfo_logs 任务日志表

实体类

  1. package com.heima.model.schedule.pojos;
  2. import com.baomidou.mybatisplus.annotation.*;
  3. import lombok.Data;
  4. import java.io.Serializable;
  5. import java.util.Date;
  6. /**
  7. * <p>
  8. *
  9. * </p>
  10. *
  11. * @author itheima
  12. */
  13. @Data
  14. @TableName("taskinfo_logs")
  15. public class TaskinfoLogs implements Serializable {
  16. private static final long serialVersionUID = 1L;
  17. /**
  18. * 任务id
  19. */
  20. @TableId(type = IdType.ID_WORKER)
  21. private Long taskId;
  22. /**
  23. * 执行时间
  24. */
  25. @TableField("execute_time")
  26. private Date executeTime;
  27. /**
  28. * 参数
  29. */
  30. @TableField("parameters")
  31. private byte[] parameters;
  32. /**
  33. * 优先级
  34. */
  35. @TableField("priority")
  36. private Integer priority;
  37. /**
  38. * 任务类型
  39. */
  40. @TableField("task_type")
  41. private Integer taskType;
  42. /**
  43. * 版本号,用乐观锁
  44. */
  45. @Version
  46. private Integer version;
  47. /**
  48. * 状态 0=int 1=EXECUTED 2=CANCELLED
  49. */
  50. @TableField("status")
  51. private Integer status;
  52. }

乐观锁支持:

  1. /**
  2. * mybatis-plus乐观锁支持
  3. * @return
  4. */
  5. @Bean
  6. public MybatisPlusInterceptor optimisticLockerInterceptor(){
  7. MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
  8. interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
  9. return interceptor;
  10. }

3、安装redis

①拉取镜像

docker pull redis

② 创建容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

③链接测试

打开资料中的Redis Desktop Manager,输入host、port、password链接测试

能链接成功,即可

4、项目集成redis

①:在项目导入redis相关依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <!-- redis依赖commons-pool 这个依赖一定要添加 -->
  6. <dependency>
  7. <groupId>org.apache.commons</groupId>
  8. <artifactId>commons-pool2</artifactId>
  9. </dependency>

②:在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis

  1. spring:
  2. redis:
  3. host: 192.168.200.130
  4. password: leadnews
  5. port: 6379

③:拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

cacheService.java

  1. package com.heima.common.redis;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.cache.annotation.CachingConfigurerSupport;
  4. import org.springframework.dao.DataAccessException;
  5. import org.springframework.data.redis.connection.*;
  6. import org.springframework.data.redis.core.*;
  7. import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
  8. import org.springframework.data.redis.core.types.Expiration;
  9. import org.springframework.lang.Nullable;
  10. import org.springframework.stereotype.Component;
  11. import java.io.IOException;
  12. import java.util.*;
  13. import java.util.concurrent.TimeUnit;
  14. @Component
  15. public class CacheService extends CachingConfigurerSupport {
  16. @Autowired
  17. private StringRedisTemplate stringRedisTemplate;
  18. public StringRedisTemplate getstringRedisTemplate() {
  19. return this.stringRedisTemplate;
  20. }
  21. /** -------------------key相关操作--------------------- */
  22. /**
  23. * 删除key
  24. *
  25. * @param key
  26. */
  27. public void delete(String key) {
  28. stringRedisTemplate.delete(key);
  29. }
  30. /**
  31. * 批量删除key
  32. *
  33. * @param keys
  34. */
  35. public void delete(Collection<String> keys) {
  36. stringRedisTemplate.delete(keys);
  37. }
  38. /**
  39. * 序列化key
  40. *
  41. * @param key
  42. * @return
  43. */
  44. public byte[] dump(String key) {
  45. return stringRedisTemplate.dump(key);
  46. }
  47. /**
  48. * 是否存在key
  49. *
  50. * @param key
  51. * @return
  52. */
  53. public Boolean exists(String key) {
  54. return stringRedisTemplate.hasKey(key);
  55. }
  56. /**
  57. * 设置过期时间
  58. *
  59. * @param key
  60. * @param timeout
  61. * @param unit
  62. * @return
  63. */
  64. public Boolean expire(String key, long timeout, TimeUnit unit) {
  65. return stringRedisTemplate.expire(key, timeout, unit);
  66. }
  67. /**
  68. * 设置过期时间
  69. *
  70. * @param key
  71. * @param date
  72. * @return
  73. */
  74. public Boolean expireAt(String key, Date date) {
  75. return stringRedisTemplate.expireAt(key, date);
  76. }
  77. /**
  78. * 查找匹配的key
  79. *
  80. * @param pattern
  81. * @return
  82. */
  83. public Set<String> keys(String pattern) {
  84. return stringRedisTemplate.keys(pattern);
  85. }
  86. /**
  87. * 将当前数据库的 key 移动到给定的数据库 db 当中
  88. *
  89. * @param key
  90. * @param dbIndex
  91. * @return
  92. */
  93. public Boolean move(String key, int dbIndex) {
  94. return stringRedisTemplate.move(key, dbIndex);
  95. }
  96. /**
  97. * 移除 key 的过期时间,key 将持久保持
  98. *
  99. * @param key
  100. * @return
  101. */
  102. public Boolean persist(String key) {
  103. return stringRedisTemplate.persist(key);
  104. }
  105. /**
  106. * 返回 key 的剩余的过期时间
  107. *
  108. * @param key
  109. * @param unit
  110. * @return
  111. */
  112. public Long getExpire(String key, TimeUnit unit) {
  113. return stringRedisTemplate.getExpire(key, unit);
  114. }
  115. /**
  116. * 返回 key 的剩余的过期时间
  117. *
  118. * @param key
  119. * @return
  120. */
  121. public Long getExpire(String key) {
  122. return stringRedisTemplate.getExpire(key);
  123. }
  124. /**
  125. * 从当前数据库中随机返回一个 key
  126. *
  127. * @return
  128. */
  129. public String randomKey() {
  130. return stringRedisTemplate.randomKey();
  131. }
  132. /**
  133. * 修改 key 的名称
  134. *
  135. * @param oldKey
  136. * @param newKey
  137. */
  138. public void rename(String oldKey, String newKey) {
  139. stringRedisTemplate.rename(oldKey, newKey);
  140. }
  141. /**
  142. * 仅当 newkey 不存在时,将 oldKey 改名为 newkey
  143. *
  144. * @param oldKey
  145. * @param newKey
  146. * @return
  147. */
  148. public Boolean renameIfAbsent(String oldKey, String newKey) {
  149. return stringRedisTemplate.renameIfAbsent(oldKey, newKey);
  150. }
  151. /**
  152. * 返回 key 所储存的值的类型
  153. *
  154. * @param key
  155. * @return
  156. */
  157. public DataType type(String key) {
  158. return stringRedisTemplate.type(key);
  159. }
  160. /** -------------------string相关操作--------------------- */
  161. /**
  162. * 设置指定 key 的值
  163. * @param key
  164. * @param value
  165. */
  166. public void set(String key, String value) {
  167. stringRedisTemplate.opsForValue().set(key, value);
  168. }
  169. /**
  170. * 获取指定 key 的值
  171. * @param key
  172. * @return
  173. */
  174. public String get(String key) {
  175. return stringRedisTemplate.opsForValue().get(key);
  176. }
  177. /**
  178. * 返回 key 中字符串值的子字符
  179. * @param key
  180. * @param start
  181. * @param end
  182. * @return
  183. */
  184. public String getRange(String key, long start, long end) {
  185. return stringRedisTemplate.opsForValue().get(key, start, end);
  186. }
  187. /**
  188. * 将给定 key 的值设为 value ,并返回 key 的旧值(old value)
  189. *
  190. * @param key
  191. * @param value
  192. * @return
  193. */
  194. public String getAndSet(String key, String value) {
  195. return stringRedisTemplate.opsForValue().getAndSet(key, value);
  196. }
  197. /**
  198. * 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
  199. *
  200. * @param key
  201. * @param offset
  202. * @return
  203. */
  204. public Boolean getBit(String key, long offset) {
  205. return stringRedisTemplate.opsForValue().getBit(key, offset);
  206. }
  207. /**
  208. * 批量获取
  209. *
  210. * @param keys
  211. * @return
  212. */
  213. public List<String> multiGet(Collection<String> keys) {
  214. return stringRedisTemplate.opsForValue().multiGet(keys);
  215. }
  216. /**
  217. * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
  218. *
  219. * @param key
  220. * @param
  221. * @param value
  222. * 值,true为1, false为0
  223. * @return
  224. */
  225. public boolean setBit(String key, long offset, boolean value) {
  226. return stringRedisTemplate.opsForValue().setBit(key, offset, value);
  227. }
  228. /**
  229. * 将值 value 关联到 key ,并将 key 的过期时间设为 timeout
  230. *
  231. * @param key
  232. * @param value
  233. * @param timeout
  234. * 过期时间
  235. * @param unit
  236. * 时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
  237. * 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
  238. */
  239. public void setEx(String key, String value, long timeout, TimeUnit unit) {
  240. stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
  241. }
  242. /**
  243. * 只有在 key 不存在时设置 key 的值
  244. *
  245. * @param key
  246. * @param value
  247. * @return 之前已经存在返回false,不存在返回true
  248. */
  249. public boolean setIfAbsent(String key, String value) {
  250. return stringRedisTemplate.opsForValue().setIfAbsent(key, value);
  251. }
  252. /**
  253. * 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
  254. *
  255. * @param key
  256. * @param value
  257. * @param offset
  258. * 从指定位置开始覆写
  259. */
  260. public void setRange(String key, String value, long offset) {
  261. stringRedisTemplate.opsForValue().set(key, value, offset);
  262. }
  263. /**
  264. * 获取字符串的长度
  265. *
  266. * @param key
  267. * @return
  268. */
  269. public Long size(String key) {
  270. return stringRedisTemplate.opsForValue().size(key);
  271. }
  272. /**
  273. * 批量添加
  274. *
  275. * @param maps
  276. */
  277. public void multiSet(Map<String, String> maps) {
  278. stringRedisTemplate.opsForValue().multiSet(maps);
  279. }
  280. /**
  281. * 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
  282. *
  283. * @param maps
  284. * @return 之前已经存在返回false,不存在返回true
  285. */
  286. public boolean multiSetIfAbsent(Map<String, String> maps) {
  287. return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps);
  288. }
  289. /**
  290. * 增加(自增长), 负数则为自减
  291. *
  292. * @param key
  293. * @param
  294. * @return
  295. */
  296. public Long incrBy(String key, long increment) {
  297. return stringRedisTemplate.opsForValue().increment(key, increment);
  298. }
  299. /**
  300. *
  301. * @param key
  302. * @param
  303. * @return
  304. */
  305. public Double incrByFloat(String key, double increment) {
  306. return stringRedisTemplate.opsForValue().increment(key, increment);
  307. }
  308. /**
  309. * 追加到末尾
  310. *
  311. * @param key
  312. * @param value
  313. * @return
  314. */
  315. public Integer append(String key, String value) {
  316. return stringRedisTemplate.opsForValue().append(key, value);
  317. }
  318. /** -------------------hash相关操作------------------------- */
  319. /**
  320. * 获取存储在哈希表中指定字段的值
  321. *
  322. * @param key
  323. * @param field
  324. * @return
  325. */
  326. public Object hGet(String key, String field) {
  327. return stringRedisTemplate.opsForHash().get(key, field);
  328. }
  329. /**
  330. * 获取所有给定字段的值
  331. *
  332. * @param key
  333. * @return
  334. */
  335. public Map<Object, Object> hGetAll(String key) {
  336. return stringRedisTemplate.opsForHash().entries(key);
  337. }
  338. /**
  339. * 获取所有给定字段的值
  340. *
  341. * @param key
  342. * @param fields
  343. * @return
  344. */
  345. public List<Object> hMultiGet(String key, Collection<Object> fields) {
  346. return stringRedisTemplate.opsForHash().multiGet(key, fields);
  347. }
  348. public void hPut(String key, String hashKey, String value) {
  349. stringRedisTemplate.opsForHash().put(key, hashKey, value);
  350. }
  351. public void hPutAll(String key, Map<String, String> maps) {
  352. stringRedisTemplate.opsForHash().putAll(key, maps);
  353. }
  354. /**
  355. * 仅当hashKey不存在时才设置
  356. *
  357. * @param key
  358. * @param hashKey
  359. * @param value
  360. * @return
  361. */
  362. public Boolean hPutIfAbsent(String key, String hashKey, String value) {
  363. return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
  364. }
  365. /**
  366. * 删除一个或多个哈希表字段
  367. *
  368. * @param key
  369. * @param fields
  370. * @return
  371. */
  372. public Long hDelete(String key, Object... fields) {
  373. return stringRedisTemplate.opsForHash().delete(key, fields);
  374. }
  375. /**
  376. * 查看哈希表 key 中,指定的字段是否存在
  377. *
  378. * @param key
  379. * @param field
  380. * @return
  381. */
  382. public boolean hExists(String key, String field) {
  383. return stringRedisTemplate.opsForHash().hasKey(key, field);
  384. }
  385. /**
  386. * 为哈希表 key 中的指定字段的整数值加上增量 increment
  387. *
  388. * @param key
  389. * @param field
  390. * @param increment
  391. * @return
  392. */
  393. public Long hIncrBy(String key, Object field, long increment) {
  394. return stringRedisTemplate.opsForHash().increment(key, field, increment);
  395. }
  396. /**
  397. * 为哈希表 key 中的指定字段的整数值加上增量 increment
  398. *
  399. * @param key
  400. * @param field
  401. * @param delta
  402. * @return
  403. */
  404. public Double hIncrByFloat(String key, Object field, double delta) {
  405. return stringRedisTemplate.opsForHash().increment(key, field, delta);
  406. }
  407. /**
  408. * 获取所有哈希表中的字段
  409. *
  410. * @param key
  411. * @return
  412. */
  413. public Set<Object> hKeys(String key) {
  414. return stringRedisTemplate.opsForHash().keys(key);
  415. }
  416. /**
  417. * 获取哈希表中字段的数量
  418. *
  419. * @param key
  420. * @return
  421. */
  422. public Long hSize(String key) {
  423. return stringRedisTemplate.opsForHash().size(key);
  424. }
  425. /**
  426. * 获取哈希表中所有值
  427. *
  428. * @param key
  429. * @return
  430. */
  431. public List<Object> hValues(String key) {
  432. return stringRedisTemplate.opsForHash().values(key);
  433. }
  434. /**
  435. * 迭代哈希表中的键值对
  436. *
  437. * @param key
  438. * @param options
  439. * @return
  440. */
  441. public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
  442. return stringRedisTemplate.opsForHash().scan(key, options);
  443. }
  444. /** ------------------------list相关操作---------------------------- */
  445. /**
  446. * 通过索引获取列表中的元素
  447. *
  448. * @param key
  449. * @param index
  450. * @return
  451. */
  452. public String lIndex(String key, long index) {
  453. return stringRedisTemplate.opsForList().index(key, index);
  454. }
  455. /**
  456. * 获取列表指定范围内的元素
  457. *
  458. * @param key
  459. * @param start
  460. * 开始位置, 0是开始位置
  461. * @param end
  462. * 结束位置, -1返回所有
  463. * @return
  464. */
  465. public List<String> lRange(String key, long start, long end) {
  466. return stringRedisTemplate.opsForList().range(key, start, end);
  467. }
  468. /**
  469. * 存储在list头部
  470. *
  471. * @param key
  472. * @param value
  473. * @return
  474. */
  475. public Long lLeftPush(String key, String value) {
  476. return stringRedisTemplate.opsForList().leftPush(key, value);
  477. }
  478. /**
  479. *
  480. * @param key
  481. * @param value
  482. * @return
  483. */
  484. public Long lLeftPushAll(String key, String... value) {
  485. return stringRedisTemplate.opsForList().leftPushAll(key, value);
  486. }
  487. /**
  488. *
  489. * @param key
  490. * @param value
  491. * @return
  492. */
  493. public Long lLeftPushAll(String key, Collection<String> value) {
  494. return stringRedisTemplate.opsForList().leftPushAll(key, value);
  495. }
  496. /**
  497. * 当list存在的时候才加入
  498. *
  499. * @param key
  500. * @param value
  501. * @return
  502. */
  503. public Long lLeftPushIfPresent(String key, String value) {
  504. return stringRedisTemplate.opsForList().leftPushIfPresent(key, value);
  505. }
  506. /**
  507. * 如果pivot存在,再pivot前面添加
  508. *
  509. * @param key
  510. * @param pivot
  511. * @param value
  512. * @return
  513. */
  514. public Long lLeftPush(String key, String pivot, String value) {
  515. return stringRedisTemplate.opsForList().leftPush(key, pivot, value);
  516. }
  517. /**
  518. *
  519. * @param key
  520. * @param value
  521. * @return
  522. */
  523. public Long lRightPush(String key, String value) {
  524. return stringRedisTemplate.opsForList().rightPush(key, value);
  525. }
  526. /**
  527. *
  528. * @param key
  529. * @param value
  530. * @return
  531. */
  532. public Long lRightPushAll(String key, String... value) {
  533. return stringRedisTemplate.opsForList().rightPushAll(key, value);
  534. }
  535. /**
  536. *
  537. * @param key
  538. * @param value
  539. * @return
  540. */
  541. public Long lRightPushAll(String key, Collection<String> value) {
  542. return stringRedisTemplate.opsForList().rightPushAll(key, value);
  543. }
  544. /**
  545. * 为已存在的列表添加值
  546. *
  547. * @param key
  548. * @param value
  549. * @return
  550. */
  551. public Long lRightPushIfPresent(String key, String value) {
  552. return stringRedisTemplate.opsForList().rightPushIfPresent(key, value);
  553. }
  554. /**
  555. * 在pivot元素的右边添加值
  556. *
  557. * @param key
  558. * @param pivot
  559. * @param value
  560. * @return
  561. */
  562. public Long lRightPush(String key, String pivot, String value) {
  563. return stringRedisTemplate.opsForList().rightPush(key, pivot, value);
  564. }
  565. /**
  566. * 通过索引设置列表元素的值
  567. *
  568. * @param key
  569. * @param index
  570. * 位置
  571. * @param value
  572. */
  573. public void lSet(String key, long index, String value) {
  574. stringRedisTemplate.opsForList().set(key, index, value);
  575. }
  576. /**
  577. * 移出并获取列表的第一个元素
  578. *
  579. * @param key
  580. * @return 删除的元素
  581. */
  582. public String lLeftPop(String key) {
  583. return stringRedisTemplate.opsForList().leftPop(key);
  584. }
  585. /**
  586. * 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
  587. *
  588. * @param key
  589. * @param timeout
  590. * 等待时间
  591. * @param unit
  592. * 时间单位
  593. * @return
  594. */
  595. public String lBLeftPop(String key, long timeout, TimeUnit unit) {
  596. return stringRedisTemplate.opsForList().leftPop(key, timeout, unit);
  597. }
  598. /**
  599. * 移除并获取列表最后一个元素
  600. *
  601. * @param key
  602. * @return 删除的元素
  603. */
  604. public String lRightPop(String key) {
  605. return stringRedisTemplate.opsForList().rightPop(key);
  606. }
  607. /**
  608. * 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
  609. *
  610. * @param key
  611. * @param timeout
  612. * 等待时间
  613. * @param unit
  614. * 时间单位
  615. * @return
  616. */
  617. public String lBRightPop(String key, long timeout, TimeUnit unit) {
  618. return stringRedisTemplate.opsForList().rightPop(key, timeout, unit);
  619. }
  620. /**
  621. * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
  622. *
  623. * @param sourceKey
  624. * @param destinationKey
  625. * @return
  626. */
  627. public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {
  628. return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
  629. destinationKey);
  630. }
  631. /**
  632. * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
  633. *
  634. * @param sourceKey
  635. * @param destinationKey
  636. * @param timeout
  637. * @param unit
  638. * @return
  639. */
  640. public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,
  641. long timeout, TimeUnit unit) {
  642. return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
  643. destinationKey, timeout, unit);
  644. }
  645. /**
  646. * 删除集合中值等于value得元素
  647. *
  648. * @param key
  649. * @param index
  650. * index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
  651. * index<0, 从尾部开始删除第一个值等于value的元素;
  652. * @param value
  653. * @return
  654. */
  655. public Long lRemove(String key, long index, String value) {
  656. return stringRedisTemplate.opsForList().remove(key, index, value);
  657. }
  658. /**
  659. * 裁剪list
  660. *
  661. * @param key
  662. * @param start
  663. * @param end
  664. */
  665. public void lTrim(String key, long start, long end) {
  666. stringRedisTemplate.opsForList().trim(key, start, end);
  667. }
  668. /**
  669. * 获取列表长度
  670. *
  671. * @param key
  672. * @return
  673. */
  674. public Long lLen(String key) {
  675. return stringRedisTemplate.opsForList().size(key);
  676. }
  677. /** --------------------set相关操作-------------------------- */
  678. /**
  679. * set添加元素
  680. *
  681. * @param key
  682. * @param values
  683. * @return
  684. */
  685. public Long sAdd(String key, String... values) {
  686. return stringRedisTemplate.opsForSet().add(key, values);
  687. }
  688. /**
  689. * set移除元素
  690. *
  691. * @param key
  692. * @param values
  693. * @return
  694. */
  695. public Long sRemove(String key, Object... values) {
  696. return stringRedisTemplate.opsForSet().remove(key, values);
  697. }
  698. /**
  699. * 移除并返回集合的一个随机元素
  700. *
  701. * @param key
  702. * @return
  703. */
  704. public String sPop(String key) {
  705. return stringRedisTemplate.opsForSet().pop(key);
  706. }
  707. /**
  708. * 将元素value从一个集合移到另一个集合
  709. *
  710. * @param key
  711. * @param value
  712. * @param destKey
  713. * @return
  714. */
  715. public Boolean sMove(String key, String value, String destKey) {
  716. return stringRedisTemplate.opsForSet().move(key, value, destKey);
  717. }
  718. /**
  719. * 获取集合的大小
  720. *
  721. * @param key
  722. * @return
  723. */
  724. public Long sSize(String key) {
  725. return stringRedisTemplate.opsForSet().size(key);
  726. }
  727. /**
  728. * 判断集合是否包含value
  729. *
  730. * @param key
  731. * @param value
  732. * @return
  733. */
  734. public Boolean sIsMember(String key, Object value) {
  735. return stringRedisTemplate.opsForSet().isMember(key, value);
  736. }
  737. /**
  738. * 获取两个集合的交集
  739. *
  740. * @param key
  741. * @param otherKey
  742. * @return
  743. */
  744. public Set<String> sIntersect(String key, String otherKey) {
  745. return stringRedisTemplate.opsForSet().intersect(key, otherKey);
  746. }
  747. /**
  748. * 获取key集合与多个集合的交集
  749. *
  750. * @param key
  751. * @param otherKeys
  752. * @return
  753. */
  754. public Set<String> sIntersect(String key, Collection<String> otherKeys) {
  755. return stringRedisTemplate.opsForSet().intersect(key, otherKeys);
  756. }
  757. /**
  758. * key集合与otherKey集合的交集存储到destKey集合中
  759. *
  760. * @param key
  761. * @param otherKey
  762. * @param destKey
  763. * @return
  764. */
  765. public Long sIntersectAndStore(String key, String otherKey, String destKey) {
  766. return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey,
  767. destKey);
  768. }
  769. /**
  770. * key集合与多个集合的交集存储到destKey集合中
  771. *
  772. * @param key
  773. * @param otherKeys
  774. * @param destKey
  775. * @return
  776. */
  777. public Long sIntersectAndStore(String key, Collection<String> otherKeys,
  778. String destKey) {
  779. return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys,
  780. destKey);
  781. }
  782. /**
  783. * 获取两个集合的并集
  784. *
  785. * @param key
  786. * @param otherKeys
  787. * @return
  788. */
  789. public Set<String> sUnion(String key, String otherKeys) {
  790. return stringRedisTemplate.opsForSet().union(key, otherKeys);
  791. }
  792. /**
  793. * 获取key集合与多个集合的并集
  794. *
  795. * @param key
  796. * @param otherKeys
  797. * @return
  798. */
  799. public Set<String> sUnion(String key, Collection<String> otherKeys) {
  800. return stringRedisTemplate.opsForSet().union(key, otherKeys);
  801. }
  802. /**
  803. * key集合与otherKey集合的并集存储到destKey中
  804. *
  805. * @param key
  806. * @param otherKey
  807. * @param destKey
  808. * @return
  809. */
  810. public Long sUnionAndStore(String key, String otherKey, String destKey) {
  811. return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
  812. }
  813. /**
  814. * key集合与多个集合的并集存储到destKey中
  815. *
  816. * @param key
  817. * @param otherKeys
  818. * @param destKey
  819. * @return
  820. */
  821. public Long sUnionAndStore(String key, Collection<String> otherKeys,
  822. String destKey) {
  823. return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
  824. }
  825. /**
  826. * 获取两个集合的差集
  827. *
  828. * @param key
  829. * @param otherKey
  830. * @return
  831. */
  832. public Set<String> sDifference(String key, String otherKey) {
  833. return stringRedisTemplate.opsForSet().difference(key, otherKey);
  834. }
  835. /**
  836. * 获取key集合与多个集合的差集
  837. *
  838. * @param key
  839. * @param otherKeys
  840. * @return
  841. */
  842. public Set<String> sDifference(String key, Collection<String> otherKeys) {
  843. return stringRedisTemplate.opsForSet().difference(key, otherKeys);
  844. }
  845. /**
  846. * key集合与otherKey集合的差集存储到destKey中
  847. *
  848. * @param key
  849. * @param otherKey
  850. * @param destKey
  851. * @return
  852. */
  853. public Long sDifference(String key, String otherKey, String destKey) {
  854. return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey,
  855. destKey);
  856. }
  857. /**
  858. * key集合与多个集合的差集存储到destKey中
  859. *
  860. * @param key
  861. * @param otherKeys
  862. * @param destKey
  863. * @return
  864. */
  865. public Long sDifference(String key, Collection<String> otherKeys,
  866. String destKey) {
  867. return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys,
  868. destKey);
  869. }
  870. /**
  871. * 获取集合所有元素
  872. *
  873. * @param key
  874. * @param
  875. * @param
  876. * @return
  877. */
  878. public Set<String> setMembers(String key) {
  879. return stringRedisTemplate.opsForSet().members(key);
  880. }
  881. /**
  882. * 随机获取集合中的一个元素
  883. *
  884. * @param key
  885. * @return
  886. */
  887. public String sRandomMember(String key) {
  888. return stringRedisTemplate.opsForSet().randomMember(key);
  889. }
  890. /**
  891. * 随机获取集合中count个元素
  892. *
  893. * @param key
  894. * @param count
  895. * @return
  896. */
  897. public List<String> sRandomMembers(String key, long count) {
  898. return stringRedisTemplate.opsForSet().randomMembers(key, count);
  899. }
  900. /**
  901. * 随机获取集合中count个元素并且去除重复的
  902. *
  903. * @param key
  904. * @param count
  905. * @return
  906. */
  907. public Set<String> sDistinctRandomMembers(String key, long count) {
  908. return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count);
  909. }
  910. /**
  911. *
  912. * @param key
  913. * @param options
  914. * @return
  915. */
  916. public Cursor<String> sScan(String key, ScanOptions options) {
  917. return stringRedisTemplate.opsForSet().scan(key, options);
  918. }
  919. /**------------------zSet相关操作--------------------------------*/
  920. /**
  921. * 添加元素,有序集合是按照元素的score值由小到大排列
  922. *
  923. * @param key
  924. * @param value
  925. * @param score
  926. * @return
  927. */
  928. public Boolean zAdd(String key, String value, double score) {
  929. return stringRedisTemplate.opsForZSet().add(key, value, score);
  930. }
  931. /**
  932. *
  933. * @param key
  934. * @param values
  935. * @return
  936. */
  937. public Long zAdd(String key, Set<TypedTuple<String>> values) {
  938. return stringRedisTemplate.opsForZSet().add(key, values);
  939. }
  940. /**
  941. *
  942. * @param key
  943. * @param values
  944. * @return
  945. */
  946. public Long zRemove(String key, Object... values) {
  947. return stringRedisTemplate.opsForZSet().remove(key, values);
  948. }
  949. public Long zRemove(String key, Collection<String> values) {
  950. if(values!=null&&!values.isEmpty()){
  951. Object[] objs = values.toArray(new Object[values.size()]);
  952. return stringRedisTemplate.opsForZSet().remove(key, objs);
  953. }
  954. return 0L;
  955. }
  956. /**
  957. * 增加元素的score值,并返回增加后的值
  958. *
  959. * @param key
  960. * @param value
  961. * @param delta
  962. * @return
  963. */
  964. public Double zIncrementScore(String key, String value, double delta) {
  965. return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta);
  966. }
  967. /**
  968. * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
  969. *
  970. * @param key
  971. * @param value
  972. * @return 0表示第一位
  973. */
  974. public Long zRank(String key, Object value) {
  975. return stringRedisTemplate.opsForZSet().rank(key, value);
  976. }
  977. /**
  978. * 返回元素在集合的排名,按元素的score值由大到小排列
  979. *
  980. * @param key
  981. * @param value
  982. * @return
  983. */
  984. public Long zReverseRank(String key, Object value) {
  985. return stringRedisTemplate.opsForZSet().reverseRank(key, value);
  986. }
  987. /**
  988. * 获取集合的元素, 从小到大排序
  989. *
  990. * @param key
  991. * @param start
  992. * 开始位置
  993. * @param end
  994. * 结束位置, -1查询所有
  995. * @return
  996. */
  997. public Set<String> zRange(String key, long start, long end) {
  998. return stringRedisTemplate.opsForZSet().range(key, start, end);
  999. }
  1000. /**
  1001. * 获取zset集合的所有元素, 从小到大排序
  1002. *
  1003. */
  1004. public Set<String> zRangeAll(String key) {
  1005. return zRange(key,0,-1);
  1006. }
  1007. /**
  1008. * 获取集合元素, 并且把score值也获取
  1009. *
  1010. * @param key
  1011. * @param start
  1012. * @param end
  1013. * @return
  1014. */
  1015. public Set<TypedTuple<String>> zRangeWithScores(String key, long start,
  1016. long end) {
  1017. return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
  1018. }
  1019. /**
  1020. * 根据Score值查询集合元素
  1021. *
  1022. * @param key
  1023. * @param min
  1024. * 最小值
  1025. * @param max
  1026. * 最大值
  1027. * @return
  1028. */
  1029. public Set<String> zRangeByScore(String key, double min, double max) {
  1030. return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max);
  1031. }
  1032. /**
  1033. * 根据Score值查询集合元素, 从小到大排序
  1034. *
  1035. * @param key
  1036. * @param min
  1037. * 最小值
  1038. * @param max
  1039. * 最大值
  1040. * @return
  1041. */
  1042. public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
  1043. double min, double max) {
  1044. return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
  1045. }
  1046. /**
  1047. *
  1048. * @param key
  1049. * @param min
  1050. * @param max
  1051. * @param start
  1052. * @param end
  1053. * @return
  1054. */
  1055. public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
  1056. double min, double max, long start, long end) {
  1057. return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
  1058. start, end);
  1059. }
  1060. /**
  1061. * 获取集合的元素, 从大到小排序
  1062. *
  1063. * @param key
  1064. * @param start
  1065. * @param end
  1066. * @return
  1067. */
  1068. public Set<String> zReverseRange(String key, long start, long end) {
  1069. return stringRedisTemplate.opsForZSet().reverseRange(key, start, end);
  1070. }
  1071. public Set<String> zReverseRangeByScore(String key, long min, long max) {
  1072. return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
  1073. }
  1074. /**
  1075. * 获取集合的元素, 从大到小排序, 并返回score值
  1076. *
  1077. * @param key
  1078. * @param start
  1079. * @param end
  1080. * @return
  1081. */
  1082. public Set<TypedTuple<String>> zReverseRangeWithScores(String key,
  1083. long start, long end) {
  1084. return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start,
  1085. end);
  1086. }
  1087. /**
  1088. * 根据Score值查询集合元素, 从大到小排序
  1089. *
  1090. * @param key
  1091. * @param min
  1092. * @param max
  1093. * @return
  1094. */
  1095. public Set<String> zReverseRangeByScore(String key, double min,
  1096. double max) {
  1097. return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
  1098. }
  1099. /**
  1100. * 根据Score值查询集合元素, 从大到小排序
  1101. *
  1102. * @param key
  1103. * @param min
  1104. * @param max
  1105. * @return
  1106. */
  1107. public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(
  1108. String key, double min, double max) {
  1109. return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
  1110. min, max);
  1111. }
  1112. /**
  1113. *
  1114. * @param key
  1115. * @param min
  1116. * @param max
  1117. * @param start
  1118. * @param end
  1119. * @return
  1120. */
  1121. public Set<String> zReverseRangeByScore(String key, double min,
  1122. double max, long start, long end) {
  1123. return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
  1124. start, end);
  1125. }
  1126. /**
  1127. * 根据score值获取集合元素数量
  1128. *
  1129. * @param key
  1130. * @param min
  1131. * @param max
  1132. * @return
  1133. */
  1134. public Long zCount(String key, double min, double max) {
  1135. return stringRedisTemplate.opsForZSet().count(key, min, max);
  1136. }
  1137. /**
  1138. * 获取集合大小
  1139. *
  1140. * @param key
  1141. * @return
  1142. */
  1143. public Long zSize(String key) {
  1144. return stringRedisTemplate.opsForZSet().size(key);
  1145. }
  1146. /**
  1147. * 获取集合大小
  1148. *
  1149. * @param key
  1150. * @return
  1151. */
  1152. public Long zZCard(String key) {
  1153. return stringRedisTemplate.opsForZSet().zCard(key);
  1154. }
  1155. /**
  1156. * 获取集合中value元素的score值
  1157. *
  1158. * @param key
  1159. * @param value
  1160. * @return
  1161. */
  1162. public Double zScore(String key, Object value) {
  1163. return stringRedisTemplate.opsForZSet().score(key, value);
  1164. }
  1165. /**
  1166. * 移除指定索引位置的成员
  1167. *
  1168. * @param key
  1169. * @param start
  1170. * @param end
  1171. * @return
  1172. */
  1173. public Long zRemoveRange(String key, long start, long end) {
  1174. return stringRedisTemplate.opsForZSet().removeRange(key, start, end);
  1175. }
  1176. /**
  1177. * 根据指定的score值的范围来移除成员
  1178. *
  1179. * @param key
  1180. * @param min
  1181. * @param max
  1182. * @return
  1183. */
  1184. public Long zRemoveRangeByScore(String key, double min, double max) {
  1185. return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max);
  1186. }
  1187. /**
  1188. * 获取key和otherKey的并集并存储在destKey中
  1189. *
  1190. * @param key
  1191. * @param otherKey
  1192. * @param destKey
  1193. * @return
  1194. */
  1195. public Long zUnionAndStore(String key, String otherKey, String destKey) {
  1196. return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
  1197. }
  1198. /**
  1199. *
  1200. * @param key
  1201. * @param otherKeys
  1202. * @param destKey
  1203. * @return
  1204. */
  1205. public Long zUnionAndStore(String key, Collection<String> otherKeys,
  1206. String destKey) {
  1207. return stringRedisTemplate.opsForZSet()
  1208. .unionAndStore(key, otherKeys, destKey);
  1209. }
  1210. /**
  1211. * 交集
  1212. *
  1213. * @param key
  1214. * @param otherKey
  1215. * @param destKey
  1216. * @return
  1217. */
  1218. public Long zIntersectAndStore(String key, String otherKey,
  1219. String destKey) {
  1220. return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey,
  1221. destKey);
  1222. }
  1223. /**
  1224. * 交集
  1225. *
  1226. * @param key
  1227. * @param otherKeys
  1228. * @param destKey
  1229. * @return
  1230. */
  1231. public Long zIntersectAndStore(String key, Collection<String> otherKeys,
  1232. String destKey) {
  1233. return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
  1234. destKey);
  1235. }
  1236. /**
  1237. *
  1238. * @param key
  1239. * @param options
  1240. * @return
  1241. */
  1242. public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {
  1243. return stringRedisTemplate.opsForZSet().scan(key, options);
  1244. }
  1245. /**
  1246. * 扫描主键,建议使用
  1247. * @param patten
  1248. * @return
  1249. */
  1250. public Set<String> scan(String patten){
  1251. Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {
  1252. Set<String> result = new HashSet<>();
  1253. try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
  1254. .match(patten).count(10000).build())) {
  1255. while (cursor.hasNext()) {
  1256. result.add(new String(cursor.next()));
  1257. }
  1258. } catch (IOException e) {
  1259. e.printStackTrace();
  1260. }
  1261. return result;
  1262. });
  1263. return keys;
  1264. }
  1265. /**
  1266. * 管道技术,提高性能
  1267. * @param type
  1268. * @param values
  1269. * @return
  1270. */
  1271. public List<Object> lRightPushPipeline(String type,Collection<String> values){
  1272. List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
  1273. public Object doInRedis(RedisConnection connection) throws DataAccessException {
  1274. StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
  1275. //集合转换数组
  1276. String[] strings = values.toArray(new String[values.size()]);
  1277. //直接批量发送
  1278. stringRedisConn.rPush(type, strings);
  1279. return null;
  1280. }
  1281. });
  1282. return results;
  1283. }
  1284. public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
  1285. List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
  1286. @Nullable
  1287. @Override
  1288. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  1289. StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
  1290. String[] strings = values.toArray(new String[values.size()]);
  1291. stringRedisConnection.rPush(topic_key,strings);
  1292. stringRedisConnection.zRem(future_key,strings);
  1293. return null;
  1294. }
  1295. });
  1296. return objects;
  1297. }
  1298. /**
  1299. * 加锁
  1300. *
  1301. * @param name
  1302. * @param expire
  1303. * @return
  1304. */
  1305. public String tryLock(String name, long expire) {
  1306. name = name + "_lock";
  1307. String token = UUID.randomUUID().toString();
  1308. RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
  1309. RedisConnection conn = factory.getConnection();
  1310. try {
  1311. //参考redis命令:
  1312. //set key value [EX seconds] [PX milliseconds] [NX|XX]
  1313. Boolean result = conn.set(
  1314. name.getBytes(),
  1315. token.getBytes(),
  1316. Expiration.from(expire, TimeUnit.MILLISECONDS),
  1317. RedisStringCommands.SetOption.SET_IF_ABSENT //NX
  1318. );
  1319. if (result != null && result)
  1320. return token;
  1321. } finally {
  1322. RedisConnectionUtils.releaseConnection(conn, factory,false);
  1323. }
  1324. return null;
  1325. }
  1326. }

④:测试

  1. package com.heima.schedule.test;
  2. import com.heima.common.redis.CacheService;
  3. import com.heima.schedule.ScheduleApplication;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.test.context.junit4.SpringRunner;
  9. import java.util.Set;
  10. @SpringBootTest(classes = ScheduleApplication.class)
  11. @RunWith(SpringRunner.class)
  12. public class RedisTest {
  13. @Autowired
  14. private CacheService cacheService;
  15. @Test
  16. public void testList(){
  17. //在list的左边添加元素
  18. // cacheService.lLeftPush("list_001","hello,redis");
  19. //在list的右边获取元素,并删除
  20. String list_001 = cacheService.lRightPop("list_001");
  21. System.out.println(list_001);
  22. }
  23. @Test
  24. public void testZset(){
  25. //添加数据到zset中 分值
  26. /*cacheService.zAdd("zset_key_001","hello zset 001",1000);
  27. cacheService.zAdd("zset_key_001","hello zset 002",8888);
  28. cacheService.zAdd("zset_key_001","hello zset 003",7777);
  29. cacheService.zAdd("zset_key_001","hello zset 004",999999);*/
  30. //按照分值获取数据
  31. Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
  32. System.out.println(zset_key_001);
  33. }
  34. }

5、添加任务

①:拷贝mybatis-plus生成的文件,mapper

②:创建task类,用于接收添加任务的参数

  1. package com.heima.model.schedule.dtos;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. @Data
  5. public class Task implements Serializable {
  6. /**
  7. * 任务id
  8. */
  9. private Long taskId;
  10. /**
  11. * 类型
  12. */
  13. private Integer taskType;
  14. /**
  15. * 优先级
  16. */
  17. private Integer priority;
  18. /**
  19. * 执行id
  20. */
  21. private long executeTime;
  22. /**
  23. * task参数
  24. */
  25. private byte[] parameters;
  26. }

③:创建TaskService

  1. package com.heima.schedule.service;
  2. import com.heima.model.schedule.dtos.Task;
  3. /**
  4. * 对外访问接口
  5. */
  6. public interface TaskService {
  7. /**
  8. * 添加任务
  9. * @param task 任务对象
  10. * @return 任务id
  11. */
  12. public long addTask(Task task) ;
  13. }

实现:

  1. package com.heima.schedule.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.common.constants.ScheduleConstants;
  4. import com.heima.common.redis.CacheService;
  5. import com.heima.model.schedule.dtos.Task;
  6. import com.heima.model.schedule.pojos.Taskinfo;
  7. import com.heima.model.schedule.pojos.TaskinfoLogs;
  8. import com.heima.schedule.mapper.TaskinfoLogsMapper;
  9. import com.heima.schedule.mapper.TaskinfoMapper;
  10. import com.heima.schedule.service.TaskService;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.beans.BeanUtils;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Service;
  15. import org.springframework.transaction.annotation.Transactional;
  16. import java.util.Calendar;
  17. import java.util.Date;
  18. @Service
  19. @Transactional
  20. @Slf4j
  21. public class TaskServiceImpl implements TaskService {
  22. /**
  23. * 添加延迟任务
  24. *
  25. * @param task
  26. * @return
  27. */
  28. @Override
  29. public long addTask(Task task) {
  30. //1.添加任务到数据库中
  31. boolean success = addTaskToDb(task);
  32. if (success) {
  33. //2.添加任务到redis
  34. addTaskToCache(task);
  35. }
  36. return task.getTaskId();
  37. }
  38. @Autowired
  39. private CacheService cacheService;
  40. /**
  41. * 把任务添加到redis中
  42. *
  43. * @param task
  44. */
  45. private void addTaskToCache(Task task) {
  46. String key = task.getTaskType() + "_" + task.getPriority();
  47. //获取5分钟之后的时间 毫秒值
  48. Calendar calendar = Calendar.getInstance();
  49. calendar.add(Calendar.MINUTE, 5);
  50. long nextScheduleTime = calendar.getTimeInMillis();
  51. //2.1 如果任务的执行时间小于等于当前时间,存入list
  52. if (task.getExecuteTime() <= System.currentTimeMillis()) {
  53. cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
  54. } else if (task.getExecuteTime() <= nextScheduleTime) {
  55. //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
  56. cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
  57. }
  58. }
  59. @Autowired
  60. private TaskinfoMapper taskinfoMapper;
  61. @Autowired
  62. private TaskinfoLogsMapper taskinfoLogsMapper;
  63. /**
  64. * 添加任务到数据库中
  65. *
  66. * @param task
  67. * @return
  68. */
  69. private boolean addTaskToDb(Task task) {
  70. boolean flag = false;
  71. try {
  72. //保存任务表
  73. Taskinfo taskinfo = new Taskinfo();
  74. BeanUtils.copyProperties(task, taskinfo);
  75. taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
  76. taskinfoMapper.insert(taskinfo);
  77. //设置taskID
  78. task.setTaskId(taskinfo.getTaskId());
  79. //保存任务日志数据
  80. TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
  81. BeanUtils.copyProperties(taskinfo, taskinfoLogs);
  82. taskinfoLogs.setVersion(1);
  83. taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
  84. taskinfoLogsMapper.insert(taskinfoLogs);
  85. flag = true;
  86. } catch (Exception e) {
  87. e.printStackTrace();
  88. }
  89. return flag;
  90. }
  91. }

ScheduleConstants常量类

  1. package com.heima.common.constants;
  2. public class ScheduleConstants {
  3. //task状态
  4. public static final int SCHEDULED=0; //初始化状态
  5. public static final int EXECUTED=1; //已执行状态
  6. public static final int CANCELLED=2; //已取消状态
  7. public static String FUTURE="future_"; //未来数据key前缀
  8. public static String TOPIC="topic_"; //当前数据key前缀
  9. }

④:测试

6、取消任务

在TaskService中添加方法

  1. /**
  2. * 取消任务
  3. * @param taskId 任务id
  4. * @return 取消结果
  5. */
  6. public boolean cancelTask(long taskId);

实现

  1. /**
  2. * 取消任务
  3. * @param taskId
  4. * @return
  5. */
  6. @Override
  7. public boolean cancelTask(long taskId) {
  8. boolean flag = false;
  9. //删除任务,更新日志
  10. Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
  11. //删除redis的数据
  12. if(task != null){
  13. removeTaskFromCache(task);
  14. flag = true;
  15. }
  16. return false;
  17. }
  18. /**
  19. * 删除redis中的任务数据
  20. * @param task
  21. */
  22. private void removeTaskFromCache(Task task) {
  23. String key = task.getTaskType()+"_"+task.getPriority();
  24. if(task.getExecuteTime()<=System.currentTimeMillis()){
  25. cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
  26. }else {
  27. cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
  28. }
  29. }
  30. /**
  31. * 删除任务,更新任务日志状态
  32. * @param taskId
  33. * @param status
  34. * @return
  35. */
  36. private Task updateDb(long taskId, int status) {
  37. Task task = null;
  38. try {
  39. //删除任务
  40. taskinfoMapper.deleteById(taskId);
  41. TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
  42. taskinfoLogs.setStatus(status);
  43. taskinfoLogsMapper.updateById(taskinfoLogs);
  44. task = new Task();
  45. BeanUtils.copyProperties(taskinfoLogs,task);
  46. task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
  47. }catch (Exception e){
  48. log.error("task cancel exception taskid={}",taskId);
  49. }
  50. return task;
  51. }

测试

7、消费任务

在TaskService中添加方法

  1. /**
  2. * 按照类型和优先级来拉取任务
  3. * @param type
  4. * @param priority
  5. * @return
  6. */
  7. public Task poll(int type,int priority);

实现

  1. /**
  2. * 按照类型和优先级拉取任务
  3. * @return
  4. */
  5. @Override
  6. public Task poll(int type,int priority) {
  7. Task task = null;
  8. try {
  9. String key = type+"_"+priority;
  10. String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
  11. if(StringUtils.isNotBlank(task_json)){
  12. task = JSON.parseObject(task_json, Task.class);
  13. //更新数据库信息
  14. updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
  15. }
  16. }catch (Exception e){
  17. e.printStackTrace();
  18. log.error("poll task exception");
  19. }
  20. return task;
  21. }

8、未来数据定时刷新

1.reids key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞。

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

代码案例:

  1. @Test
  2. public void testKeys(){
  3. Set<String> keys = cacheService.keys("future_*");
  4. System.out.println(keys);
  5. Set<String> scan = cacheService.scan("future_*");
  6. System.out.println(scan);
  7. }

2.reids管道

普通redis客户端和服务器交互模式

Pipeline请求模型

官方测试结果数据对比

 测试案例对比:

  1. //耗时6151
  2. @Test
  3. public void testPiple1(){
  4. long start =System.currentTimeMillis();
  5. for (int i = 0; i <10000 ; i++) {
  6. Task task = new Task();
  7. task.setTaskType(1001);
  8. task.setPriority(1);
  9. task.setExecuteTime(new Date().getTime());
  10. cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
  11. }
  12. System.out.println("耗时"+(System.currentTimeMillis()- start));
  13. }
  14. @Test
  15. public void testPiple2(){
  16. long start = System.currentTimeMillis();
  17. //使用管道技术
  18. List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
  19. @Nullable
  20. @Override
  21. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  22. for (int i = 0; i <10000 ; i++) {
  23. Task task = new Task();
  24. task.setTaskType(1001);
  25. task.setPriority(1);
  26. task.setExecuteTime(new Date().getTime());
  27. redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
  28. }
  29. return null;
  30. }
  31. });
  32. System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
  33. }

3.未来数据定时刷新-功能完成

在TaskService中添加方法         

  1. @Scheduled(cron = "0 */1 * * * ?")
  2. public void refresh() {
  3. System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
  4. // 获取所有未来数据集合的key值
  5. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
  6. for (String futureKey : futureKeys) { // future_250_250
  7. String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
  8. //获取该组key下当前需要消费的任务数据
  9. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  10. if (!tasks.isEmpty()) {
  11. //将这些任务数据添加到消费者队列中
  12. cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
  13. System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
  14. }
  15. }
  16. }

在引导类中添加开启任务调度注解:@EnableScheduling

9、分布式锁解决集群下的方法抢占执行

1.问题描述

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

2.分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

3.redis分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

4.在工具类CacheService中添加方法

  1. /**
  2. * 加锁
  3. *
  4. * @param name
  5. * @param expire
  6. * @return
  7. */
  8. public String tryLock(String name, long expire) {
  9. name = name + "_lock";
  10. String token = UUID.randomUUID().toString();
  11. RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
  12. RedisConnection conn = factory.getConnection();
  13. try {
  14. //参考redis命令:
  15. //set key value [EX seconds] [PX milliseconds] [NX|XX]
  16. Boolean result = conn.set(
  17. name.getBytes(),
  18. token.getBytes(),
  19. Expiration.from(expire, TimeUnit.MILLISECONDS),
  20. RedisStringCommands.SetOption.SET_IF_ABSENT //NX
  21. );
  22. if (result != null && result)
  23. return token;
  24. } finally {
  25. RedisConnectionUtils.releaseConnection(conn, factory,false);
  26. }
  27. return null;
  28. }

修改未来数据定时刷新的方法,如下:

  1. /**
  2. * 未来数据定时刷新
  3. */
  4. @Scheduled(cron = "0 */1 * * * ?")
  5. public void refresh(){
  6. String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
  7. if(StringUtils.isNotBlank(token)){
  8. log.info("未来数据定时刷新---定时任务");
  9. //获取所有未来数据的集合key
  10. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
  11. for (String futureKey : futureKeys) {//future_100_50
  12. //获取当前数据的key topic
  13. String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
  14. //按照key和分值查询符合条件的数据
  15. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  16. //同步数据
  17. if(!tasks.isEmpty()){
  18. cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
  19. log.info("成功的将"+futureKey+"刷新到了"+topicKey);
  20. }
  21. }
  22. }
  23. }

10、数据库同步到redis

  1. @Scheduled(cron = "0 */5 * * * ?")
  2. @PostConstruct
  3. public void reloadData() {
  4. clearCache();
  5. log.info("数据库数据同步到缓存");
  6. Calendar calendar = Calendar.getInstance();
  7. calendar.add(Calendar.MINUTE, 5);
  8. //查看小于未来5分钟的所有任务
  9. List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
  10. if(allTasks != null && allTasks.size() > 0){
  11. for (Taskinfo taskinfo : allTasks) {
  12. Task task = new Task();
  13. BeanUtils.copyProperties(taskinfo,task);
  14. task.setExecuteTime(taskinfo.getExecuteTime().getTime());
  15. addTaskToCache(task);
  16. }
  17. }
  18. }
  19. private void clearCache(){
  20. // 删除缓存中未来数据集合和当前消费者队列的所有key
  21. Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
  22. Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
  23. cacheService.delete(futurekeys);
  24. cacheService.delete(topickeys);
  25. }

三、延迟队列解决精准时间发布文章


1、延迟队列服务提供对外接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

  1. package com.heima.apis.schedule;
  2. import com.heima.model.common.dtos.ResponseResult;
  3. import com.heima.model.schedule.dtos.Task;
  4. import org.springframework.cloud.openfeign.FeignClient;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.PostMapping;
  8. import org.springframework.web.bind.annotation.RequestBody;
  9. @FeignClient("leadnews-schedule")
  10. public interface IScheduleClient {
  11. /**
  12. * 添加任务
  13. * @param task 任务对象
  14. * @return 任务id
  15. */
  16. @PostMapping("/api/v1/task/add")
  17. public ResponseResult addTask(@RequestBody Task task);
  18. /**
  19. * 取消任务
  20. * @param taskId 任务id
  21. * @return 取消结果
  22. */
  23. @GetMapping("/api/v1/task/cancel/{taskId}")
  24. public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
  25. /**
  26. * 按照类型和优先级来拉取任务
  27. * @param type
  28. * @param priority
  29. * @return
  30. */
  31. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  32. public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
  33. }

在heima-leadnews-schedule微服务下提供对应的实现

  1. package com.heima.schedule.feign;
  2. import com.heima.apis.schedule.IScheduleClient;
  3. import com.heima.model.common.dtos.ResponseResult;
  4. import com.heima.model.schedule.dtos.Task;
  5. import com.heima.schedule.service.TaskService;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.*;
  8. @RestController
  9. public class ScheduleClient implements IScheduleClient {
  10. @Autowired
  11. private TaskService taskService;
  12. /**
  13. * 添加任务
  14. * @param task 任务对象
  15. * @return 任务id
  16. */
  17. @PostMapping("/api/v1/task/add")
  18. @Override
  19. public ResponseResult addTask(@RequestBody Task task) {
  20. return ResponseResult.okResult(taskService.addTask(task));
  21. }
  22. /**
  23. * 取消任务
  24. * @param taskId 任务id
  25. * @return 取消结果
  26. */
  27. @GetMapping("/api/v1/task/cancel/{taskId}")
  28. @Override
  29. public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
  30. return ResponseResult.okResult(taskService.cancelTask(taskId));
  31. }
  32. /**
  33. * 按照类型和优先级来拉取任务
  34. * @param type
  35. * @param priority
  36. * @return
  37. */
  38. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  39. @Override
  40. public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
  41. return ResponseResult.okResult(taskService.poll(type,priority));
  42. }
  43. }

2、发布文章集成添加延迟队列接口

在创建WmNewsTaskService

  1. package com.heima.wemedia.service;
  2. import com.heima.model.wemedia.pojos.WmNews;
  3. public interface WmNewsTaskService {
  4. /**
  5. * 添加任务到延迟队列中
  6. * @param id 文章的id
  7. * @param publishTime 发布的时间 可以做为任务的执行时间
  8. */
  9. public void addNewsToTask(Integer id, Date publishTime);
  10. }

实现:

  1. package com.heima.wemedia.service.impl;
  2. import com.heima.apis.schedule.IScheduleClient;
  3. import com.heima.model.common.enums.TaskTypeEnum;
  4. import com.heima.model.schedule.dtos.Task;
  5. import com.heima.model.wemedia.pojos.WmNews;
  6. import com.heima.utils.common.ProtostuffUtil;
  7. import com.heima.wemedia.service.WmNewsTaskService;
  8. import lombok.SneakyThrows;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.scheduling.annotation.Async;
  12. import org.springframework.stereotype.Service;
  13. @Service
  14. @Slf4j
  15. public class WmNewsTaskServiceImpl implements WmNewsTaskService {
  16. @Autowired
  17. private IScheduleClient scheduleClient;
  18. /**
  19. * 添加任务到延迟队列中
  20. * @param id 文章的id
  21. * @param publishTime 发布的时间 可以做为任务的执行时间
  22. */
  23. @Override
  24. @Async
  25. public void addNewsToTask(Integer id, Date publishTime) {
  26. log.info("添加任务到延迟服务中----begin");
  27. Task task = new Task();
  28. task.setExecuteTime(publishTime.getTime());
  29. task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
  30. task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  31. WmNews wmNews = new WmNews();
  32. wmNews.setId(id);
  33. task.setParameters(ProtostuffUtil.serialize(wmNews));
  34. scheduleClient.addTask(task);
  35. log.info("添加任务到延迟服务中----end");
  36. }
  37. }

枚举类:

  1. package com.heima.model.common.enums;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. @Getter
  5. @AllArgsConstructor
  6. public enum TaskTypeEnum {
  7. NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
  8. REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
  9. private final int taskType; //对应具体业务
  10. private final int priority; //业务不同级别
  11. private final String desc; //描述信息
  12. }

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组

  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

拷贝资料中的两个类到heima-leadnews-utils下

JdkSerializeUtil.java

  1. package com.heima.utils.common;
  2. import java.io.ByteArrayInputStream;
  3. import java.io.ByteArrayOutputStream;
  4. import java.io.ObjectInputStream;
  5. import java.io.ObjectOutputStream;
  6. /**
  7. * jdk序列化
  8. */
  9. public class JdkSerializeUtil {
  10. /**
  11. * 序列化
  12. * @param obj
  13. * @param <T>
  14. * @return
  15. */
  16. public static <T> byte[] serialize(T obj) {
  17. if (obj == null){
  18. throw new NullPointerException();
  19. }
  20. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  21. try {
  22. ObjectOutputStream oos = new ObjectOutputStream(bos);
  23. oos.writeObject(obj);
  24. return bos.toByteArray();
  25. } catch (Exception ex) {
  26. ex.printStackTrace();
  27. }
  28. return new byte[0];
  29. }
  30. /**
  31. * 反序列化
  32. * @param data
  33. * @param clazz
  34. * @param <T>
  35. * @return
  36. */
  37. public static <T> T deserialize(byte[] data, Class<T> clazz) {
  38. ByteArrayInputStream bis = new ByteArrayInputStream(data);
  39. try {
  40. ObjectInputStream ois = new ObjectInputStream(bis);
  41. T obj = (T)ois.readObject();
  42. return obj;
  43. } catch (Exception ex) {
  44. ex.printStackTrace();
  45. }
  46. return null;
  47. }
  48. }

 ProtostuffUtil.java

  1. package com.heima.utils.common;
  2. import com.heima.model.wemedia.pojos.WmNews;
  3. import io.protostuff.LinkedBuffer;
  4. import io.protostuff.ProtostuffIOUtil;
  5. import io.protostuff.Schema;
  6. import io.protostuff.runtime.RuntimeSchema;
  7. public class ProtostuffUtil {
  8. /**
  9. * 序列化
  10. * @param t
  11. * @param <T>
  12. * @return
  13. */
  14. public static <T> byte[] serialize(T t){
  15. Schema schema = RuntimeSchema.getSchema(t.getClass());
  16. return ProtostuffIOUtil.toByteArray(t,schema,
  17. LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
  18. }
  19. /**
  20. * 反序列化
  21. * @param bytes
  22. * @param c
  23. * @param <T>
  24. * @return
  25. */
  26. public static <T> T deserialize(byte []bytes,Class<T> c) {
  27. T t = null;
  28. try {
  29. t = c.newInstance();
  30. Schema schema = RuntimeSchema.getSchema(t.getClass());
  31. ProtostuffIOUtil.mergeFrom(bytes,t,schema);
  32. } catch (InstantiationException e) {
  33. e.printStackTrace();
  34. } catch (IllegalAccessException e) {
  35. e.printStackTrace();
  36. }
  37. return t;
  38. }
  39. /**
  40. * jdk序列化与protostuff序列化对比
  41. * @param args
  42. */
  43. public static void main(String[] args) {
  44. long start =System.currentTimeMillis();
  45. for (int i = 0; i <1000000 ; i++) {
  46. WmNews wmNews =new WmNews();
  47. JdkSerializeUtil.serialize(wmNews);
  48. }
  49. System.out.println(" jdk 花费 "+(System.currentTimeMillis()-start));
  50. start =System.currentTimeMillis();
  51. for (int i = 0; i <1000000 ; i++) {
  52. WmNews wmNews =new WmNews();
  53. ProtostuffUtil.serialize(wmNews);
  54. }
  55. System.out.println(" protostuff 花费 "+(System.currentTimeMillis()-start));
  56. }
  57. }

Protostuff需要引导依赖:

  1. <dependency>
  2.     <groupId>io.protostuff</groupId>
  3.     <artifactId>protostuff-core</artifactId>
  4.     <version>1.6.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>io.protostuff</groupId>
  8.     <artifactId>protostuff-runtime</artifactId>
  9.     <version>1.6.0</version>
  10. </dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

  1. @Autowired
  2. private WmNewsTaskService wmNewsTaskService;
  3. /**
  4. * 发布修改文章或保存为草稿
  5. * @param dto
  6. * @return
  7. */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10. //0.条件判断
  11. if(dto == null || dto.getContent() == null){
  12. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  13. }
  14. //1.保存或修改文章
  15. WmNews wmNews = new WmNews();
  16. //属性拷贝 属性名词和类型相同才能拷贝
  17. BeanUtils.copyProperties(dto,wmNews);
  18. //封面图片 list---> string
  19. if(dto.getImages() != null && dto.getImages().size() > 0){
  20. //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
  21. String imageStr = StringUtils.join(dto.getImages(), ",");
  22. wmNews.setImages(imageStr);
  23. }
  24. //如果当前封面类型为自动 -1
  25. if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
  26. wmNews.setType(null);
  27. }
  28. saveOrUpdateWmNews(wmNews);
  29. //2.判断是否为草稿 如果为草稿结束当前方法
  30. if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
  31. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  32. }
  33. //3.不是草稿,保存文章内容图片与素材的关系
  34. //获取到文章内容中的图片信息
  35. List<String> materials = ectractUrlInfo(dto.getContent());
  36. saveRelativeInfoForContent(materials,wmNews.getId());
  37. //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
  38. saveRelativeInfoForCover(dto,wmNews,materials);
  39. //审核文章
  40. // wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  41. wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
  42. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  43. }

3、消费任务进行审核文章

WmNewsTaskService中添加方法

  1. /**
  2. * 消费延迟队列数据
  3. */
  4. public void scanNewsByTask();

实现

  1. @Autowired
  2. private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
  3. /**
  4. * 消费延迟队列数据
  5. */
  6. @Scheduled(fixedRate = 1000)
  7. @Override
  8. @SneakyThrows
  9. public void scanNewsByTask() {
  10. log.info("文章审核---消费任务执行---begin---");
  11. ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  12. if(responseResult.getCode().equals(200) && responseResult.getData() != null){
  13. String json_str = JSON.toJSONString(responseResult.getData());
  14. Task task = JSON.parseObject(json_str, Task.class);
  15. byte[] parameters = task.getParameters();
  16. WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
  17. System.out.println(wmNews.getId()+"-----------");
  18. wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  19. }
  20. log.info("文章审核---消费任务执行---end---");
  21. }

结束!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/125700
推荐阅读
相关标签
  

闽ICP备14008679号