当前位置:   article > 正文

redis实现延迟任务_redis实现延时任务

redis实现延时任务

一.实现思路

1.为什么redis中使用两种数据类型,list和zset

2.在添加zset数据的时候,为什么需要预加载?

3.延迟任务服务实现

3.1 搭建heima-leadnews-schedule模块

leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务

①:导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:

 ②:添加bootstrap.yml

  1. server:
  2. port: 51701
  3. spring:
  4. application:
  5. name: leadnews-schedule
  6. cloud:
  7. nacos:
  8. discovery:
  9. server-addr: 192.168.200.130:8848
  10. config:
  11. server-addr: 192.168.200.130:8848
  12. file-extension: yml

③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置

  1. spring:
  2. datasource:
  3. driver-class-name: com.mysql.jdbc.Driver
  4. url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  5. username: root
  6. password: root
  7. # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
  8. mybatis-plus:
  9. mapper-locations: classpath*:mapper/*.xml
  10. # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  11. type-aliases-package: com.heima.model.schedule.pojos

4.数据库准备

taskinfo 任务表

 实体类

  1. package com.heima.model.schedule.pojos;
  2. import com.baomidou.mybatisplus.annotation.IdType;
  3. import com.baomidou.mybatisplus.annotation.TableField;
  4. import com.baomidou.mybatisplus.annotation.TableId;
  5. import com.baomidou.mybatisplus.annotation.TableName;
  6. import lombok.Data;
  7. import java.io.Serializable;
  8. import java.util.Date;
  9. /**
  10. * <p>
  11. *
  12. * </p>
  13. *
  14. * @author itheima
  15. */
  16. @Data
  17. @TableName("taskinfo")
  18. public class Taskinfo implements Serializable {
  19. private static final long serialVersionUID = 1L;
  20. /**
  21. * 任务id
  22. */
  23. @TableId(type = IdType.ID_WORKER)
  24. private Long taskId;
  25. /**
  26. * 执行时间
  27. */
  28. @TableField("execute_time")
  29. private Date executeTime;
  30. /**
  31. * 参数
  32. */
  33. @TableField("parameters")
  34. private byte[] parameters;
  35. /**
  36. * 优先级
  37. */
  38. @TableField("priority")
  39. private Integer priority;
  40. /**
  41. * 任务类型
  42. */
  43. @TableField("task_type")
  44. private Integer taskType;
  45. }

taskinfo_logs 任务日志表

实体类

  1. package com.heima.model.schedule.pojos;
  2. import com.baomidou.mybatisplus.annotation.*;
  3. import lombok.Data;
  4. import java.io.Serializable;
  5. import java.util.Date;
  6. /**
  7. * <p>
  8. *
  9. * </p>
  10. *
  11. * @author itheima
  12. */
  13. @Data
  14. @TableName("taskinfo_logs")
  15. public class TaskinfoLogs implements Serializable {
  16. private static final long serialVersionUID = 1L;
  17. /**
  18. * 任务id
  19. */
  20. @TableId(type = IdType.ID_WORKER)
  21. private Long taskId;
  22. /**
  23. * 执行时间
  24. */
  25. @TableField("execute_time")
  26. private Date executeTime;
  27. /**
  28. * 参数
  29. */
  30. @TableField("parameters")
  31. private byte[] parameters;
  32. /**
  33. * 优先级
  34. */
  35. @TableField("priority")
  36. private Integer priority;
  37. /**
  38. * 任务类型
  39. */
  40. @TableField("task_type")
  41. private Integer taskType;
  42. /**
  43. * 版本号,用乐观锁
  44. */
  45. @Version
  46. private Integer version;
  47. /**
  48. * 状态 0=int(初始化) 1=EXECUTED(执行成功) 2=CANCELLED(撤销)
  49. */
  50. @TableField("status")
  51. private Integer status;
  52. }

乐观锁支持

  1. /**
  2. * mybatis-plus乐观锁支持
  3. * @return
  4. */
  5. @Bean
  6. public MybatisPlusInterceptor optimisticLockerInterceptor(){
  7. MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
  8. interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
  9. return interceptor;
  10. }

5.安装redis

①拉取镜像

docker pull redis

② 创建容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

6.项目集成redis

① 在项目导入redis相关依赖,已经完成

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <!-- redis依赖commons-pool 这个依赖一定要添加 -->
  6. <dependency>
  7. <groupId>org.apache.commons</groupId>
  8. <artifactId>commons-pool2</artifactId>
  9. </dependency>

② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis

  1. spring:
  2. redis:
  3. host: 192.168.200.130
  4. password: leadnews
  5. port: 6379

③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

  1. package com.heima.schedule.config;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.cache.annotation.CachingConfigurerSupport;
  4. import org.springframework.dao.DataAccessException;
  5. import org.springframework.data.redis.connection.DataType;
  6. import org.springframework.data.redis.connection.RedisConnection;
  7. import org.springframework.data.redis.connection.StringRedisConnection;
  8. import org.springframework.data.redis.core.Cursor;
  9. import org.springframework.data.redis.core.RedisCallback;
  10. import org.springframework.data.redis.core.ScanOptions;
  11. import org.springframework.data.redis.core.StringRedisTemplate;
  12. import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
  13. import org.springframework.lang.Nullable;
  14. import org.springframework.stereotype.Component;
  15. import java.io.IOException;
  16. import java.util.*;
  17. import java.util.concurrent.TimeUnit;
  18. @Component
  19. public class CacheService extends CachingConfigurerSupport {
  20. @Autowired
  21. private StringRedisTemplate stringRedisTemplate;
  22. public StringRedisTemplate getstringRedisTemplate() {
  23. return this.stringRedisTemplate;
  24. }
  25. /** -------------------key相关操作--------------------- */
  26. /**
  27. * 删除key
  28. *
  29. * @param key
  30. */
  31. public void delete(String key) {
  32. stringRedisTemplate.delete(key);
  33. }
  34. /**
  35. * 批量删除key
  36. *
  37. * @param keys
  38. */
  39. public void delete(Collection<String> keys) {
  40. stringRedisTemplate.delete(keys);
  41. }
  42. /**
  43. * 序列化key
  44. *
  45. * @param key
  46. * @return
  47. */
  48. public byte[] dump(String key) {
  49. return stringRedisTemplate.dump(key);
  50. }
  51. /**
  52. * 是否存在key
  53. *
  54. * @param key
  55. * @return
  56. */
  57. public Boolean exists(String key) {
  58. return stringRedisTemplate.hasKey(key);
  59. }
  60. /**
  61. * 设置过期时间
  62. *
  63. * @param key
  64. * @param timeout
  65. * @param unit
  66. * @return
  67. */
  68. public Boolean expire(String key, long timeout, TimeUnit unit) {
  69. return stringRedisTemplate.expire(key, timeout, unit);
  70. }
  71. /**
  72. * 设置过期时间
  73. *
  74. * @param key
  75. * @param date
  76. * @return
  77. */
  78. public Boolean expireAt(String key, Date date) {
  79. return stringRedisTemplate.expireAt(key, date);
  80. }
  81. /**
  82. * 查找匹配的key
  83. *
  84. * @param pattern
  85. * @return
  86. */
  87. public Set<String> keys(String pattern) {
  88. return stringRedisTemplate.keys(pattern);
  89. }
  90. /**
  91. * 将当前数据库的 key 移动到给定的数据库 db 当中
  92. *
  93. * @param key
  94. * @param dbIndex
  95. * @return
  96. */
  97. public Boolean move(String key, int dbIndex) {
  98. return stringRedisTemplate.move(key, dbIndex);
  99. }
  100. /**
  101. * 移除 key 的过期时间,key 将持久保持
  102. *
  103. * @param key
  104. * @return
  105. */
  106. public Boolean persist(String key) {
  107. return stringRedisTemplate.persist(key);
  108. }
  109. /**
  110. * 返回 key 的剩余的过期时间
  111. *
  112. * @param key
  113. * @param unit
  114. * @return
  115. */
  116. public Long getExpire(String key, TimeUnit unit) {
  117. return stringRedisTemplate.getExpire(key, unit);
  118. }
  119. /**
  120. * 返回 key 的剩余的过期时间
  121. *
  122. * @param key
  123. * @return
  124. */
  125. public Long getExpire(String key) {
  126. return stringRedisTemplate.getExpire(key);
  127. }
  128. /**
  129. * 从当前数据库中随机返回一个 key
  130. *
  131. * @return
  132. */
  133. public String randomKey() {
  134. return stringRedisTemplate.randomKey();
  135. }
  136. /**
  137. * 修改 key 的名称
  138. *
  139. * @param oldKey
  140. * @param newKey
  141. */
  142. public void rename(String oldKey, String newKey) {
  143. stringRedisTemplate.rename(oldKey, newKey);
  144. }
  145. /**
  146. * 仅当 newkey 不存在时,将 oldKey 改名为 newkey
  147. *
  148. * @param oldKey
  149. * @param newKey
  150. * @return
  151. */
  152. public Boolean renameIfAbsent(String oldKey, String newKey) {
  153. return stringRedisTemplate.renameIfAbsent(oldKey, newKey);
  154. }
  155. /**
  156. * 返回 key 所储存的值的类型
  157. *
  158. * @param key
  159. * @return
  160. */
  161. public DataType type(String key) {
  162. return stringRedisTemplate.type(key);
  163. }
  164. /** -------------------string相关操作--------------------- */
  165. /**
  166. * 设置指定 key 的值
  167. * @param key
  168. * @param value
  169. */
  170. public void set(String key, String value) {
  171. stringRedisTemplate.opsForValue().set(key, value);
  172. }
  173. /**
  174. * 获取指定 key 的值
  175. * @param key
  176. * @return
  177. */
  178. public String get(String key) {
  179. return stringRedisTemplate.opsForValue().get(key);
  180. }
  181. /**
  182. * 返回 key 中字符串值的子字符
  183. * @param key
  184. * @param start
  185. * @param end
  186. * @return
  187. */
  188. public String getRange(String key, long start, long end) {
  189. return stringRedisTemplate.opsForValue().get(key, start, end);
  190. }
  191. /**
  192. * 将给定 key 的值设为 value ,并返回 key 的旧值(old value)
  193. *
  194. * @param key
  195. * @param value
  196. * @return
  197. */
  198. public String getAndSet(String key, String value) {
  199. return stringRedisTemplate.opsForValue().getAndSet(key, value);
  200. }
  201. /**
  202. * 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
  203. *
  204. * @param key
  205. * @param offset
  206. * @return
  207. */
  208. public Boolean getBit(String key, long offset) {
  209. return stringRedisTemplate.opsForValue().getBit(key, offset);
  210. }
  211. /**
  212. * 批量获取
  213. *
  214. * @param keys
  215. * @return
  216. */
  217. public List<String> multiGet(Collection<String> keys) {
  218. return stringRedisTemplate.opsForValue().multiGet(keys);
  219. }
  220. /**
  221. * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
  222. *
  223. * @param key
  224. * @param
  225. * @param value
  226. * 值,true为1, false为0
  227. * @return
  228. */
  229. public boolean setBit(String key, long offset, boolean value) {
  230. return stringRedisTemplate.opsForValue().setBit(key, offset, value);
  231. }
  232. /**
  233. * 将值 value 关联到 key ,并将 key 的过期时间设为 timeout
  234. *
  235. * @param key
  236. * @param value
  237. * @param timeout
  238. * 过期时间
  239. * @param unit
  240. * 时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
  241. * 秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
  242. */
  243. public void setEx(String key, String value, long timeout, TimeUnit unit) {
  244. stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
  245. }
  246. /**
  247. * 只有在 key 不存在时设置 key 的值
  248. *
  249. * @param key
  250. * @param value
  251. * @return 之前已经存在返回false,不存在返回true
  252. */
  253. public boolean setIfAbsent(String key, String value) {
  254. return stringRedisTemplate.opsForValue().setIfAbsent(key, value);
  255. }
  256. /**
  257. * 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
  258. *
  259. * @param key
  260. * @param value
  261. * @param offset
  262. * 从指定位置开始覆写
  263. */
  264. public void setRange(String key, String value, long offset) {
  265. stringRedisTemplate.opsForValue().set(key, value, offset);
  266. }
  267. /**
  268. * 获取字符串的长度
  269. *
  270. * @param key
  271. * @return
  272. */
  273. public Long size(String key) {
  274. return stringRedisTemplate.opsForValue().size(key);
  275. }
  276. /**
  277. * 批量添加
  278. *
  279. * @param maps
  280. */
  281. public void multiSet(Map<String, String> maps) {
  282. stringRedisTemplate.opsForValue().multiSet(maps);
  283. }
  284. /**
  285. * 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
  286. *
  287. * @param maps
  288. * @return 之前已经存在返回false,不存在返回true
  289. */
  290. public boolean multiSetIfAbsent(Map<String, String> maps) {
  291. return stringRedisTemplate.opsForValue().multiSetIfAbsent(maps);
  292. }
  293. /**
  294. * 增加(自增长), 负数则为自减
  295. *
  296. * @param key
  297. * @param
  298. * @return
  299. */
  300. public Long incrBy(String key, long increment) {
  301. return stringRedisTemplate.opsForValue().increment(key, increment);
  302. }
  303. /**
  304. *
  305. * @param key
  306. * @param
  307. * @return
  308. */
  309. public Double incrByFloat(String key, double increment) {
  310. return stringRedisTemplate.opsForValue().increment(key, increment);
  311. }
  312. /**
  313. * 追加到末尾
  314. *
  315. * @param key
  316. * @param value
  317. * @return
  318. */
  319. public Integer append(String key, String value) {
  320. return stringRedisTemplate.opsForValue().append(key, value);
  321. }
  322. /** -------------------hash相关操作------------------------- */
  323. /**
  324. * 获取存储在哈希表中指定字段的值
  325. *
  326. * @param key
  327. * @param field
  328. * @return
  329. */
  330. public Object hGet(String key, String field) {
  331. return stringRedisTemplate.opsForHash().get(key, field);
  332. }
  333. /**
  334. * 获取所有给定字段的值
  335. *
  336. * @param key
  337. * @return
  338. */
  339. public Map<Object, Object> hGetAll(String key) {
  340. return stringRedisTemplate.opsForHash().entries(key);
  341. }
  342. /**
  343. * 获取所有给定字段的值
  344. *
  345. * @param key
  346. * @param fields
  347. * @return
  348. */
  349. public List<Object> hMultiGet(String key, Collection<Object> fields) {
  350. return stringRedisTemplate.opsForHash().multiGet(key, fields);
  351. }
  352. public void hPut(String key, String hashKey, String value) {
  353. stringRedisTemplate.opsForHash().put(key, hashKey, value);
  354. }
  355. public void hPutAll(String key, Map<String, String> maps) {
  356. stringRedisTemplate.opsForHash().putAll(key, maps);
  357. }
  358. /**
  359. * 仅当hashKey不存在时才设置
  360. *
  361. * @param key
  362. * @param hashKey
  363. * @param value
  364. * @return
  365. */
  366. public Boolean hPutIfAbsent(String key, String hashKey, String value) {
  367. return stringRedisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
  368. }
  369. /**
  370. * 删除一个或多个哈希表字段
  371. *
  372. * @param key
  373. * @param fields
  374. * @return
  375. */
  376. public Long hDelete(String key, Object... fields) {
  377. return stringRedisTemplate.opsForHash().delete(key, fields);
  378. }
  379. /**
  380. * 查看哈希表 key 中,指定的字段是否存在
  381. *
  382. * @param key
  383. * @param field
  384. * @return
  385. */
  386. public boolean hExists(String key, String field) {
  387. return stringRedisTemplate.opsForHash().hasKey(key, field);
  388. }
  389. /**
  390. * 为哈希表 key 中的指定字段的整数值加上增量 increment
  391. *
  392. * @param key
  393. * @param field
  394. * @param increment
  395. * @return
  396. */
  397. public Long hIncrBy(String key, Object field, long increment) {
  398. return stringRedisTemplate.opsForHash().increment(key, field, increment);
  399. }
  400. /**
  401. * 为哈希表 key 中的指定字段的整数值加上增量 increment
  402. *
  403. * @param key
  404. * @param field
  405. * @param delta
  406. * @return
  407. */
  408. public Double hIncrByFloat(String key, Object field, double delta) {
  409. return stringRedisTemplate.opsForHash().increment(key, field, delta);
  410. }
  411. /**
  412. * 获取所有哈希表中的字段
  413. *
  414. * @param key
  415. * @return
  416. */
  417. public Set<Object> hKeys(String key) {
  418. return stringRedisTemplate.opsForHash().keys(key);
  419. }
  420. /**
  421. * 获取哈希表中字段的数量
  422. *
  423. * @param key
  424. * @return
  425. */
  426. public Long hSize(String key) {
  427. return stringRedisTemplate.opsForHash().size(key);
  428. }
  429. /**
  430. * 获取哈希表中所有值
  431. *
  432. * @param key
  433. * @return
  434. */
  435. public List<Object> hValues(String key) {
  436. return stringRedisTemplate.opsForHash().values(key);
  437. }
  438. /**
  439. * 迭代哈希表中的键值对
  440. *
  441. * @param key
  442. * @param options
  443. * @return
  444. */
  445. public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
  446. return stringRedisTemplate.opsForHash().scan(key, options);
  447. }
  448. /** ------------------------list相关操作---------------------------- */
  449. /**
  450. * 通过索引获取列表中的元素
  451. *
  452. * @param key
  453. * @param index
  454. * @return
  455. */
  456. public String lIndex(String key, long index) {
  457. return stringRedisTemplate.opsForList().index(key, index);
  458. }
  459. /**
  460. * 获取列表指定范围内的元素
  461. *
  462. * @param key
  463. * @param start
  464. * 开始位置, 0是开始位置
  465. * @param end
  466. * 结束位置, -1返回所有
  467. * @return
  468. */
  469. public List<String> lRange(String key, long start, long end) {
  470. return stringRedisTemplate.opsForList().range(key, start, end);
  471. }
  472. /**
  473. * 存储在list头部
  474. *
  475. * @param key
  476. * @param value
  477. * @return
  478. */
  479. public Long lLeftPush(String key, String value) {
  480. return stringRedisTemplate.opsForList().leftPush(key, value);
  481. }
  482. /**
  483. *
  484. * @param key
  485. * @param value
  486. * @return
  487. */
  488. public Long lLeftPushAll(String key, String... value) {
  489. return stringRedisTemplate.opsForList().leftPushAll(key, value);
  490. }
  491. /**
  492. *
  493. * @param key
  494. * @param value
  495. * @return
  496. */
  497. public Long lLeftPushAll(String key, Collection<String> value) {
  498. return stringRedisTemplate.opsForList().leftPushAll(key, value);
  499. }
  500. /**
  501. * 当list存在的时候才加入
  502. *
  503. * @param key
  504. * @param value
  505. * @return
  506. */
  507. public Long lLeftPushIfPresent(String key, String value) {
  508. return stringRedisTemplate.opsForList().leftPushIfPresent(key, value);
  509. }
  510. /**
  511. * 如果pivot存在,再pivot前面添加
  512. *
  513. * @param key
  514. * @param pivot
  515. * @param value
  516. * @return
  517. */
  518. public Long lLeftPush(String key, String pivot, String value) {
  519. return stringRedisTemplate.opsForList().leftPush(key, pivot, value);
  520. }
  521. /**
  522. *
  523. * @param key
  524. * @param value
  525. * @return
  526. */
  527. public Long lRightPush(String key, String value) {
  528. return stringRedisTemplate.opsForList().rightPush(key, value);
  529. }
  530. /**
  531. *
  532. * @param key
  533. * @param value
  534. * @return
  535. */
  536. public Long lRightPushAll(String key, String... value) {
  537. return stringRedisTemplate.opsForList().rightPushAll(key, value);
  538. }
  539. /**
  540. *
  541. * @param key
  542. * @param value
  543. * @return
  544. */
  545. public Long lRightPushAll(String key, Collection<String> value) {
  546. return stringRedisTemplate.opsForList().rightPushAll(key, value);
  547. }
  548. /**
  549. * 为已存在的列表添加值
  550. *
  551. * @param key
  552. * @param value
  553. * @return
  554. */
  555. public Long lRightPushIfPresent(String key, String value) {
  556. return stringRedisTemplate.opsForList().rightPushIfPresent(key, value);
  557. }
  558. /**
  559. * 在pivot元素的右边添加值
  560. *
  561. * @param key
  562. * @param pivot
  563. * @param value
  564. * @return
  565. */
  566. public Long lRightPush(String key, String pivot, String value) {
  567. return stringRedisTemplate.opsForList().rightPush(key, pivot, value);
  568. }
  569. /**
  570. * 通过索引设置列表元素的值
  571. *
  572. * @param key
  573. * @param index
  574. * 位置
  575. * @param value
  576. */
  577. public void lSet(String key, long index, String value) {
  578. stringRedisTemplate.opsForList().set(key, index, value);
  579. }
  580. /**
  581. * 移出并获取列表的第一个元素
  582. *
  583. * @param key
  584. * @return 删除的元素
  585. */
  586. public String lLeftPop(String key) {
  587. return stringRedisTemplate.opsForList().leftPop(key);
  588. }
  589. /**
  590. * 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
  591. *
  592. * @param key
  593. * @param timeout
  594. * 等待时间
  595. * @param unit
  596. * 时间单位
  597. * @return
  598. */
  599. public String lBLeftPop(String key, long timeout, TimeUnit unit) {
  600. return stringRedisTemplate.opsForList().leftPop(key, timeout, unit);
  601. }
  602. /**
  603. * 移除并获取列表最后一个元素
  604. *
  605. * @param key
  606. * @return 删除的元素
  607. */
  608. public String lRightPop(String key) {
  609. return stringRedisTemplate.opsForList().rightPop(key);
  610. }
  611. /**
  612. * 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
  613. *
  614. * @param key
  615. * @param timeout
  616. * 等待时间
  617. * @param unit
  618. * 时间单位
  619. * @return
  620. */
  621. public String lBRightPop(String key, long timeout, TimeUnit unit) {
  622. return stringRedisTemplate.opsForList().rightPop(key, timeout, unit);
  623. }
  624. /**
  625. * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
  626. *
  627. * @param sourceKey
  628. * @param destinationKey
  629. * @return
  630. */
  631. public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {
  632. return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
  633. destinationKey);
  634. }
  635. /**
  636. * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
  637. *
  638. * @param sourceKey
  639. * @param destinationKey
  640. * @param timeout
  641. * @param unit
  642. * @return
  643. */
  644. public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,
  645. long timeout, TimeUnit unit) {
  646. return stringRedisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
  647. destinationKey, timeout, unit);
  648. }
  649. /**
  650. * 删除集合中值等于value得元素
  651. *
  652. * @param key
  653. * @param index
  654. * index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
  655. * index<0, 从尾部开始删除第一个值等于value的元素;
  656. * @param value
  657. * @return
  658. */
  659. public Long lRemove(String key, long index, String value) {
  660. return stringRedisTemplate.opsForList().remove(key, index, value);
  661. }
  662. /**
  663. * 裁剪list
  664. *
  665. * @param key
  666. * @param start
  667. * @param end
  668. */
  669. public void lTrim(String key, long start, long end) {
  670. stringRedisTemplate.opsForList().trim(key, start, end);
  671. }
  672. /**
  673. * 获取列表长度
  674. *
  675. * @param key
  676. * @return
  677. */
  678. public Long lLen(String key) {
  679. return stringRedisTemplate.opsForList().size(key);
  680. }
  681. /** --------------------set相关操作-------------------------- */
  682. /**
  683. * set添加元素
  684. *
  685. * @param key
  686. * @param values
  687. * @return
  688. */
  689. public Long sAdd(String key, String... values) {
  690. return stringRedisTemplate.opsForSet().add(key, values);
  691. }
  692. /**
  693. * set移除元素
  694. *
  695. * @param key
  696. * @param values
  697. * @return
  698. */
  699. public Long sRemove(String key, Object... values) {
  700. return stringRedisTemplate.opsForSet().remove(key, values);
  701. }
  702. /**
  703. * 移除并返回集合的一个随机元素
  704. *
  705. * @param key
  706. * @return
  707. */
  708. public String sPop(String key) {
  709. return stringRedisTemplate.opsForSet().pop(key);
  710. }
  711. /**
  712. * 将元素value从一个集合移到另一个集合
  713. *
  714. * @param key
  715. * @param value
  716. * @param destKey
  717. * @return
  718. */
  719. public Boolean sMove(String key, String value, String destKey) {
  720. return stringRedisTemplate.opsForSet().move(key, value, destKey);
  721. }
  722. /**
  723. * 获取集合的大小
  724. *
  725. * @param key
  726. * @return
  727. */
  728. public Long sSize(String key) {
  729. return stringRedisTemplate.opsForSet().size(key);
  730. }
  731. /**
  732. * 判断集合是否包含value
  733. *
  734. * @param key
  735. * @param value
  736. * @return
  737. */
  738. public Boolean sIsMember(String key, Object value) {
  739. return stringRedisTemplate.opsForSet().isMember(key, value);
  740. }
  741. /**
  742. * 获取两个集合的交集
  743. *
  744. * @param key
  745. * @param otherKey
  746. * @return
  747. */
  748. public Set<String> sIntersect(String key, String otherKey) {
  749. return stringRedisTemplate.opsForSet().intersect(key, otherKey);
  750. }
  751. /**
  752. * 获取key集合与多个集合的交集
  753. *
  754. * @param key
  755. * @param otherKeys
  756. * @return
  757. */
  758. public Set<String> sIntersect(String key, Collection<String> otherKeys) {
  759. return stringRedisTemplate.opsForSet().intersect(key, otherKeys);
  760. }
  761. /**
  762. * key集合与otherKey集合的交集存储到destKey集合中
  763. *
  764. * @param key
  765. * @param otherKey
  766. * @param destKey
  767. * @return
  768. */
  769. public Long sIntersectAndStore(String key, String otherKey, String destKey) {
  770. return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKey,
  771. destKey);
  772. }
  773. /**
  774. * key集合与多个集合的交集存储到destKey集合中
  775. *
  776. * @param key
  777. * @param otherKeys
  778. * @param destKey
  779. * @return
  780. */
  781. public Long sIntersectAndStore(String key, Collection<String> otherKeys,
  782. String destKey) {
  783. return stringRedisTemplate.opsForSet().intersectAndStore(key, otherKeys,
  784. destKey);
  785. }
  786. /**
  787. * 获取两个集合的并集
  788. *
  789. * @param key
  790. * @param otherKeys
  791. * @return
  792. */
  793. public Set<String> sUnion(String key, String otherKeys) {
  794. return stringRedisTemplate.opsForSet().union(key, otherKeys);
  795. }
  796. /**
  797. * 获取key集合与多个集合的并集
  798. *
  799. * @param key
  800. * @param otherKeys
  801. * @return
  802. */
  803. public Set<String> sUnion(String key, Collection<String> otherKeys) {
  804. return stringRedisTemplate.opsForSet().union(key, otherKeys);
  805. }
  806. /**
  807. * key集合与otherKey集合的并集存储到destKey中
  808. *
  809. * @param key
  810. * @param otherKey
  811. * @param destKey
  812. * @return
  813. */
  814. public Long sUnionAndStore(String key, String otherKey, String destKey) {
  815. return stringRedisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
  816. }
  817. /**
  818. * key集合与多个集合的并集存储到destKey中
  819. *
  820. * @param key
  821. * @param otherKeys
  822. * @param destKey
  823. * @return
  824. */
  825. public Long sUnionAndStore(String key, Collection<String> otherKeys,
  826. String destKey) {
  827. return stringRedisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
  828. }
  829. /**
  830. * 获取两个集合的差集
  831. *
  832. * @param key
  833. * @param otherKey
  834. * @return
  835. */
  836. public Set<String> sDifference(String key, String otherKey) {
  837. return stringRedisTemplate.opsForSet().difference(key, otherKey);
  838. }
  839. /**
  840. * 获取key集合与多个集合的差集
  841. *
  842. * @param key
  843. * @param otherKeys
  844. * @return
  845. */
  846. public Set<String> sDifference(String key, Collection<String> otherKeys) {
  847. return stringRedisTemplate.opsForSet().difference(key, otherKeys);
  848. }
  849. /**
  850. * key集合与otherKey集合的差集存储到destKey中
  851. *
  852. * @param key
  853. * @param otherKey
  854. * @param destKey
  855. * @return
  856. */
  857. public Long sDifference(String key, String otherKey, String destKey) {
  858. return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKey,
  859. destKey);
  860. }
  861. /**
  862. * key集合与多个集合的差集存储到destKey中
  863. *
  864. * @param key
  865. * @param otherKeys
  866. * @param destKey
  867. * @return
  868. */
  869. public Long sDifference(String key, Collection<String> otherKeys,
  870. String destKey) {
  871. return stringRedisTemplate.opsForSet().differenceAndStore(key, otherKeys,
  872. destKey);
  873. }
  874. /**
  875. * 获取集合所有元素
  876. *
  877. * @param key
  878. * @param
  879. * @param
  880. * @return
  881. */
  882. public Set<String> setMembers(String key) {
  883. return stringRedisTemplate.opsForSet().members(key);
  884. }
  885. /**
  886. * 随机获取集合中的一个元素
  887. *
  888. * @param key
  889. * @return
  890. */
  891. public String sRandomMember(String key) {
  892. return stringRedisTemplate.opsForSet().randomMember(key);
  893. }
  894. /**
  895. * 随机获取集合中count个元素
  896. *
  897. * @param key
  898. * @param count
  899. * @return
  900. */
  901. public List<String> sRandomMembers(String key, long count) {
  902. return stringRedisTemplate.opsForSet().randomMembers(key, count);
  903. }
  904. /**
  905. * 随机获取集合中count个元素并且去除重复的
  906. *
  907. * @param key
  908. * @param count
  909. * @return
  910. */
  911. public Set<String> sDistinctRandomMembers(String key, long count) {
  912. return stringRedisTemplate.opsForSet().distinctRandomMembers(key, count);
  913. }
  914. /**
  915. *
  916. * @param key
  917. * @param options
  918. * @return
  919. */
  920. public Cursor<String> sScan(String key, ScanOptions options) {
  921. return stringRedisTemplate.opsForSet().scan(key, options);
  922. }
  923. /**------------------zSet相关操作--------------------------------*/
  924. /**
  925. * 添加元素,有序集合是按照元素的score值由小到大排列
  926. *
  927. * @param key
  928. * @param value
  929. * @param score
  930. * @return
  931. */
  932. public Boolean zAdd(String key, String value, double score) {
  933. return stringRedisTemplate.opsForZSet().add(key, value, score);
  934. }
  935. /**
  936. *
  937. * @param key
  938. * @param values
  939. * @return
  940. */
  941. public Long zAdd(String key, Set<TypedTuple<String>> values) {
  942. return stringRedisTemplate.opsForZSet().add(key, values);
  943. }
  944. /**
  945. *
  946. * @param key
  947. * @param values
  948. * @return
  949. */
  950. public Long zRemove(String key, Object... values) {
  951. return stringRedisTemplate.opsForZSet().remove(key, values);
  952. }
  953. public Long zRemove(String key, Collection<String> values) {
  954. if(values!=null&&!values.isEmpty()){
  955. Object[] objs = values.toArray(new Object[values.size()]);
  956. return stringRedisTemplate.opsForZSet().remove(key, objs);
  957. }
  958. return 0L;
  959. }
  960. /**
  961. * 增加元素的score值,并返回增加后的值
  962. *
  963. * @param key
  964. * @param value
  965. * @param delta
  966. * @return
  967. */
  968. public Double zIncrementScore(String key, String value, double delta) {
  969. return stringRedisTemplate.opsForZSet().incrementScore(key, value, delta);
  970. }
  971. /**
  972. * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
  973. *
  974. * @param key
  975. * @param value
  976. * @return 0表示第一位
  977. */
  978. public Long zRank(String key, Object value) {
  979. return stringRedisTemplate.opsForZSet().rank(key, value);
  980. }
  981. /**
  982. * 返回元素在集合的排名,按元素的score值由大到小排列
  983. *
  984. * @param key
  985. * @param value
  986. * @return
  987. */
  988. public Long zReverseRank(String key, Object value) {
  989. return stringRedisTemplate.opsForZSet().reverseRank(key, value);
  990. }
  991. /**
  992. * 获取集合的元素, 从小到大排序
  993. *
  994. * @param key
  995. * @param start
  996. * 开始位置
  997. * @param end
  998. * 结束位置, -1查询所有
  999. * @return
  1000. */
  1001. public Set<String> zRange(String key, long start, long end) {
  1002. return stringRedisTemplate.opsForZSet().range(key, start, end);
  1003. }
  1004. /**
  1005. * 获取zset集合的所有元素, 从小到大排序
  1006. *
  1007. */
  1008. public Set<String> zRangeAll(String key) {
  1009. return zRange(key,0,-1);
  1010. }
  1011. /**
  1012. * 获取集合元素, 并且把score值也获取
  1013. *
  1014. * @param key
  1015. * @param start
  1016. * @param end
  1017. * @return
  1018. */
  1019. public Set<TypedTuple<String>> zRangeWithScores(String key, long start,
  1020. long end) {
  1021. return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
  1022. }
  1023. /**
  1024. * 根据Score值查询集合元素
  1025. *
  1026. * @param key
  1027. * @param min
  1028. * 最小值
  1029. * @param max
  1030. * 最大值
  1031. * @return
  1032. */
  1033. public Set<String> zRangeByScore(String key, double min, double max) {
  1034. return stringRedisTemplate.opsForZSet().rangeByScore(key, min, max);
  1035. }
  1036. /**
  1037. * 根据Score值查询集合元素, 从小到大排序
  1038. *
  1039. * @param key
  1040. * @param min
  1041. * 最小值
  1042. * @param max
  1043. * 最大值
  1044. * @return
  1045. */
  1046. public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
  1047. double min, double max) {
  1048. return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
  1049. }
  1050. /**
  1051. *
  1052. * @param key
  1053. * @param min
  1054. * @param max
  1055. * @param start
  1056. * @param end
  1057. * @return
  1058. */
  1059. public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
  1060. double min, double max, long start, long end) {
  1061. return stringRedisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
  1062. start, end);
  1063. }
  1064. /**
  1065. * 获取集合的元素, 从大到小排序
  1066. *
  1067. * @param key
  1068. * @param start
  1069. * @param end
  1070. * @return
  1071. */
  1072. public Set<String> zReverseRange(String key, long start, long end) {
  1073. return stringRedisTemplate.opsForZSet().reverseRange(key, start, end);
  1074. }
  1075. public Set<String> zReverseRangeByScore(String key, long min, long max) {
  1076. return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
  1077. }
  1078. /**
  1079. * 获取集合的元素, 从大到小排序, 并返回score值
  1080. *
  1081. * @param key
  1082. * @param start
  1083. * @param end
  1084. * @return
  1085. */
  1086. public Set<TypedTuple<String>> zReverseRangeWithScores(String key,
  1087. long start, long end) {
  1088. return stringRedisTemplate.opsForZSet().reverseRangeWithScores(key, start,
  1089. end);
  1090. }
  1091. /**
  1092. * 根据Score值查询集合元素, 从大到小排序
  1093. *
  1094. * @param key
  1095. * @param min
  1096. * @param max
  1097. * @return
  1098. */
  1099. public Set<String> zReverseRangeByScore(String key, double min,
  1100. double max) {
  1101. return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
  1102. }
  1103. /**
  1104. * 根据Score值查询集合元素, 从大到小排序
  1105. *
  1106. * @param key
  1107. * @param min
  1108. * @param max
  1109. * @return
  1110. */
  1111. public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(
  1112. String key, double min, double max) {
  1113. return stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
  1114. min, max);
  1115. }
  1116. /**
  1117. *
  1118. * @param key
  1119. * @param min
  1120. * @param max
  1121. * @param start
  1122. * @param end
  1123. * @return
  1124. */
  1125. public Set<String> zReverseRangeByScore(String key, double min,
  1126. double max, long start, long end) {
  1127. return stringRedisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
  1128. start, end);
  1129. }
  1130. /**
  1131. * 根据score值获取集合元素数量
  1132. *
  1133. * @param key
  1134. * @param min
  1135. * @param max
  1136. * @return
  1137. */
  1138. public Long zCount(String key, double min, double max) {
  1139. return stringRedisTemplate.opsForZSet().count(key, min, max);
  1140. }
  1141. /**
  1142. * 获取集合大小
  1143. *
  1144. * @param key
  1145. * @return
  1146. */
  1147. public Long zSize(String key) {
  1148. return stringRedisTemplate.opsForZSet().size(key);
  1149. }
  1150. /**
  1151. * 获取集合大小
  1152. *
  1153. * @param key
  1154. * @return
  1155. */
  1156. public Long zZCard(String key) {
  1157. return stringRedisTemplate.opsForZSet().zCard(key);
  1158. }
  1159. /**
  1160. * 获取集合中value元素的score值
  1161. *
  1162. * @param key
  1163. * @param value
  1164. * @return
  1165. */
  1166. public Double zScore(String key, Object value) {
  1167. return stringRedisTemplate.opsForZSet().score(key, value);
  1168. }
  1169. /**
  1170. * 移除指定索引位置的成员
  1171. *
  1172. * @param key
  1173. * @param start
  1174. * @param end
  1175. * @return
  1176. */
  1177. public Long zRemoveRange(String key, long start, long end) {
  1178. return stringRedisTemplate.opsForZSet().removeRange(key, start, end);
  1179. }
  1180. /**
  1181. * 根据指定的score值的范围来移除成员
  1182. *
  1183. * @param key
  1184. * @param min
  1185. * @param max
  1186. * @return
  1187. */
  1188. public Long zRemoveRangeByScore(String key, double min, double max) {
  1189. return stringRedisTemplate.opsForZSet().removeRangeByScore(key, min, max);
  1190. }
  1191. /**
  1192. * 获取key和otherKey的并集并存储在destKey中
  1193. *
  1194. * @param key
  1195. * @param otherKey
  1196. * @param destKey
  1197. * @return
  1198. */
  1199. public Long zUnionAndStore(String key, String otherKey, String destKey) {
  1200. return stringRedisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
  1201. }
  1202. /**
  1203. *
  1204. * @param key
  1205. * @param otherKeys
  1206. * @param destKey
  1207. * @return
  1208. */
  1209. public Long zUnionAndStore(String key, Collection<String> otherKeys,
  1210. String destKey) {
  1211. return stringRedisTemplate.opsForZSet()
  1212. .unionAndStore(key, otherKeys, destKey);
  1213. }
  1214. /**
  1215. * 交集
  1216. *
  1217. * @param key
  1218. * @param otherKey
  1219. * @param destKey
  1220. * @return
  1221. */
  1222. public Long zIntersectAndStore(String key, String otherKey,
  1223. String destKey) {
  1224. return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKey,
  1225. destKey);
  1226. }
  1227. /**
  1228. * 交集
  1229. *
  1230. * @param key
  1231. * @param otherKeys
  1232. * @param destKey
  1233. * @return
  1234. */
  1235. public Long zIntersectAndStore(String key, Collection<String> otherKeys,
  1236. String destKey) {
  1237. return stringRedisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
  1238. destKey);
  1239. }
  1240. /**
  1241. *
  1242. * @param key
  1243. * @param options
  1244. * @return
  1245. */
  1246. public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {
  1247. return stringRedisTemplate.opsForZSet().scan(key, options);
  1248. }
  1249. /**
  1250. * 扫描主键,建议使用
  1251. * @param patten
  1252. * @return
  1253. */
  1254. public Set<String> scan(String patten){
  1255. Set<String> keys = stringRedisTemplate.execute((RedisCallback<Set<String>>) connection -> {
  1256. Set<String> result = new HashSet<>();
  1257. try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
  1258. .match(patten).count(10000).build())) {
  1259. while (cursor.hasNext()) {
  1260. result.add(new String(cursor.next()));
  1261. }
  1262. } catch (IOException e) {
  1263. e.printStackTrace();
  1264. }
  1265. return result;
  1266. });
  1267. return keys;
  1268. }
  1269. /**
  1270. * 管道技术,提高性能
  1271. * @param type
  1272. * @param values
  1273. * @return
  1274. */
  1275. public List<Object> lRightPushPipeline(String type,Collection<String> values){
  1276. List<Object> results = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
  1277. public Object doInRedis(RedisConnection connection) throws DataAccessException {
  1278. StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
  1279. //集合转换数组
  1280. String[] strings = values.toArray(new String[values.size()]);
  1281. //直接批量发送
  1282. stringRedisConn.rPush(type, strings);
  1283. return null;
  1284. }
  1285. });
  1286. return results;
  1287. }
  1288. public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
  1289. List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
  1290. @Nullable
  1291. @Override
  1292. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  1293. StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
  1294. String[] strings = values.toArray(new String[values.size()]);
  1295. stringRedisConnection.rPush(topic_key,strings);
  1296. stringRedisConnection.zRem(future_key,strings);
  1297. return null;
  1298. }
  1299. });
  1300. return objects;
  1301. }
  1302. /**
  1303. * 加锁
  1304. *
  1305. * @param name
  1306. * @param expire
  1307. * @return
  1308. */
  1309. public String tryLock(String name, long expire) {
  1310. name = name + "_lock";
  1311. String token = UUID.randomUUID().toString();
  1312. RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
  1313. RedisConnection conn = factory.getConnection();
  1314. try {
  1315. //参考redis命令:
  1316. //set key value [EX seconds] [PX milliseconds] [NX|XX]
  1317. Boolean result = conn.set(
  1318. name.getBytes(), //锁的名称
  1319. token.getBytes(),
  1320. Expiration.from(expire, TimeUnit.MILLISECONDS),//过期时间,自己设置30s,60s过期
  1321. RedisStringCommands.SetOption.SET_IF_ABSENT //NX
  1322. );
  1323. if (result != null && result)
  1324. return token;
  1325. } finally {
  1326. RedisConnectionUtils.releaseConnection(conn, factory,false);
  1327. }
  1328. return null;
  1329. }
  1330. }

④:测试

  1. package com.heima.schedule.test;
  2. import com.heima.common.redis.CacheService;
  3. import com.heima.schedule.ScheduleApplication;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.test.context.junit4.SpringRunner;
  9. import java.util.Set;
  10. @SpringBootTest(classes = ScheduleApplication.class)
  11. @RunWith(SpringRunner.class)
  12. public class RedisTest {
  13. @Autowired
  14. private CacheService cacheService;
  15. @Test
  16. public void testList(){
  17. //在list的左边添加元素
  18. // cacheService.lLeftPush("list_001","hello,redis");
  19. //在list的右边获取元素,并删除
  20. String list_001 = cacheService.lRightPop("list_001");
  21. System.out.println(list_001);
  22. }
  23. @Test
  24. public void testZset(){
  25. //添加数据到zset中 分值
  26. /*cacheService.zAdd("zset_key_001","hello zset 001",1000);
  27. cacheService.zAdd("zset_key_001","hello zset 002",8888);
  28. cacheService.zAdd("zset_key_001","hello zset 003",7777);
  29. cacheService.zAdd("zset_key_001","hello zset 004",999999);*/
  30. //按照分值获取数据
  31. Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
  32. System.out.println(zset_key_001);
  33. }
  34. }

二.redis延迟任务实现

1.添加taskinfoMapper

  1. package com.heima.schedule.mapper;
  2. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  3. import com.heima.model.schedule.pojos.Taskinfo;
  4. import org.apache.ibatis.annotations.Mapper;
  5. import org.apache.ibatis.annotations.Param;
  6. import org.apache.ibatis.annotations.Select;
  7. import java.util.Date;
  8. import java.util.List;
  9. /**
  10. * <p>
  11. * Mapper 接口
  12. * </p>
  13. *
  14. * @author itheima
  15. */
  16. @Mapper
  17. public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
  18. public List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future")Date future);
  19. }

 以及taskinfoMapper.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  3. <mapper namespace="com.heima.schedule.mapper.TaskinfoMapper">
  4. <select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">
  5. select *
  6. from taskinfo
  7. where task_type = #{taskType}
  8. and priority = #{priority}
  9. and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
  10. </select>
  11. </mapper>

2.添加taskinfoLogsMapper

  1. package com.heima.schedule.mapper;
  2. import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  3. import com.heima.model.schedule.pojos.TaskinfoLogs;
  4. import org.apache.ibatis.annotations.Mapper;
  5. /**
  6. * <p>
  7. * Mapper 接口
  8. * </p>
  9. *
  10. * @author itheima
  11. */
  12. @Mapper
  13. public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
  14. }

3.创建task类,用于创建添加任务参数

  1. package com.heima.model.schedule.dtos;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. @Data
  5. public class Task implements Serializable {
  6. /**
  7. * 任务id
  8. */
  9. private Long taskId;
  10. /**
  11. * 类型
  12. */
  13. private Integer taskType;
  14. /**
  15. * 优先级
  16. */
  17. private Integer priority;
  18. /**
  19. * 执行id
  20. */
  21. private long executeTime;
  22. /**
  23. * task参数
  24. */
  25. private byte[] parameters;
  26. }

4.创建taskService

  1. package com.heima.schedule.service;
  2. import com.heima.model.schedule.dtos.Task;
  3. /**
  4. * 对外访问接口
  5. */
  6. public interface TaskService {
  7. /**
  8. * 添加任务
  9. * @param task 任务对象
  10. * @return 任务id
  11. */
  12. public long addTask(Task task) ;
  13. }

实现:

  1. package com.heima.schedule.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.heima.common.constants.ScheduleConstants;
  4. import com.heima.common.redis.CacheService;
  5. import com.heima.model.schedule.dtos.Task;
  6. import com.heima.model.schedule.pojos.Taskinfo;
  7. import com.heima.model.schedule.pojos.TaskinfoLogs;
  8. import com.heima.schedule.mapper.TaskinfoLogsMapper;
  9. import com.heima.schedule.mapper.TaskinfoMapper;
  10. import com.heima.schedule.service.TaskService;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.beans.BeanUtils;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Service;
  15. import org.springframework.transaction.annotation.Transactional;
  16. import java.util.Calendar;
  17. import java.util.Date;
  18. @Service
  19. @Transactional
  20. @Slf4j
  21. public class TaskServiceImpl implements TaskService {
  22. /**
  23. * 添加延迟任务
  24. *
  25. * @param task
  26. * @return
  27. */
  28. @Override
  29. public long addTask(Task task) {
  30. //1.添加任务到数据库中
  31. boolean success = addTaskToDb(task);
  32. if (success) {
  33. //2.添加任务到redis
  34. addTaskToCache(task);
  35. }
  36. return task.getTaskId();
  37. }
  38. @Autowired
  39. private CacheService cacheService;
  40. /**
  41. * 把任务添加到redis中
  42. *
  43. * @param task
  44. */
  45. private void addTaskToCache(Task task) {
  46. String key = task.getTaskType() + "_" + task.getPriority();
  47. //获取5分钟之后的时间 毫秒值
  48. Calendar calendar = Calendar.getInstance();
  49. calendar.add(Calendar.MINUTE, 5);
  50. long nextScheduleTime = calendar.getTimeInMillis();
  51. //2.1 如果任务的执行时间小于等于当前时间,存入list
  52. if (task.getExecuteTime() <= System.currentTimeMillis()) {
  53. cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
  54. } else if (task.getExecuteTime() <= nextScheduleTime) {
  55. //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
  56. cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
  57. }
  58. }
  59. @Autowired
  60. private TaskinfoMapper taskinfoMapper;
  61. @Autowired
  62. private TaskinfoLogsMapper taskinfoLogsMapper;
  63. /**
  64. * 添加任务到数据库中
  65. *
  66. * @param task
  67. * @return
  68. */
  69. private boolean addTaskToDb(Task task) {
  70. boolean flag = false;
  71. try {
  72. //保存任务表
  73. Taskinfo taskinfo = new Taskinfo();
  74. BeanUtils.copyProperties(task, taskinfo);
  75. taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
  76. taskinfoMapper.insert(taskinfo);
  77. //设置taskID
  78. task.setTaskId(taskinfo.getTaskId());
  79. //保存任务日志数据
  80. TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
  81. BeanUtils.copyProperties(taskinfo, taskinfoLogs);
  82. taskinfoLogs.setVersion(1);
  83. taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
  84. taskinfoLogsMapper.insert(taskinfoLogs);
  85. flag = true;
  86. } catch (Exception e) {
  87. e.printStackTrace();
  88. }
  89. return flag;
  90. }
  91. }

ScheduleConstants常量类

  1. package com.heima.common.constants;
  2. public class ScheduleConstants {
  3. //task状态
  4. public static final int SCHEDULED=0; //初始化状态
  5. public static final int EXECUTED=1; //已执行状态
  6. public static final int CANCELLED=2; //已取消状态
  7. public static String FUTURE="future_"; //未来数据key前缀
  8. public static String TOPIC="topic_"; //当前数据key前缀
  9. }

5.取消任务

例如:第三接口网络不通,使用延迟任务进行重试,当达到阈值以后,取消任务.

在TaskService中添加方法

  1. /**
  2. * 取消任务
  3. * @param taskId 任务id
  4. * @return 取消结果
  5. */
  6. public boolean cancelTask(long taskId);

 实现

  1. /**
  2. * 取消任务
  3. * @param taskId
  4. * @return
  5. */
  6. @Override
  7. public boolean cancelTask(long taskId) {
  8. boolean flag = false;
  9. //删除任务,更新日志
  10. Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
  11. //删除redis的数据
  12. if(task != null){
  13. removeTaskFromCache(task);
  14. flag = true;
  15. }
  16. return false;
  17. }
  18. /**
  19. * 删除redis中的任务数据
  20. * @param task
  21. */
  22. private void removeTaskFromCache(Task task) {
  23. String key = task.getTaskType()+"_"+task.getPriority();
  24. if(task.getExecuteTime()<=System.currentTimeMillis()){
  25. cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
  26. }else {
  27. cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
  28. }
  29. }
  30. /**
  31. * 删除任务,更新任务日志状态
  32. * @param taskId
  33. * @param status
  34. * @return
  35. */
  36. private Task updateDb(long taskId, int status) {
  37. Task task = null;
  38. try {
  39. //删除任务
  40. taskinfoMapper.deleteById(taskId);
  41. TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
  42. taskinfoLogs.setStatus(status);
  43. taskinfoLogsMapper.updateById(taskinfoLogs);
  44. task = new Task();
  45. BeanUtils.copyProperties(taskinfoLogs,task);
  46. task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
  47. }catch (Exception e){
  48. log.error("task cancel exception taskid={}",taskId);
  49. }
  50. return task;
  51. }

6.消费任务

实现思路

在TaskService中添加方法

  1. /**
  2. * 按照类型和优先级来拉取任务
  3. * @param type
  4. * @param priority
  5. * @return
  6. */
  7. public Task poll(int type,int priority);

 实现

  1. /**
  2. * 按照类型和优先级拉取任务
  3. * @return
  4. */
  5. @Override
  6. public Task poll(int type,int priority) {
  7. Task task = null;
  8. try {
  9. String key = type+"_"+priority;
  10. String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
  11. if(StringUtils.isNotBlank(task_json)){
  12. task = JSON.parseObject(task_json, Task.class);
  13. //更新数据库信息
  14. updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
  15. }
  16. }catch (Exception e){
  17. e.printStackTrace();
  18. log.error("poll task exception");
  19. }
  20. return task;
  21. }

7.定时任务刷新未来数据

 获取zset中所有key的两种方案

代码案例:

  1. @Test
  2. public void testKeys(){
  3. //第一种方式,不建议使用
  4. Set<String> keys = cacheService.keys("future_*");
  5. System.out.println(keys);
  6. //第二种方式,一次获取十个
  7. Set<String> scan = cacheService.scan("future_*");
  8. System.out.println(scan);
  9. }

返回结果

8.如何同步数据?

未来数据刷新-redis管道 性能很差,因为客户端要不断的创建连接

 redis管道技术

 测试案例对比:

  1. //耗时6151
  2. @Test
  3. public void testPiple1(){
  4. long start =System.currentTimeMillis();
  5. for (int i = 0; i <10000 ; i++) {
  6. Task task = new Task();
  7. task.setTaskType(1001);
  8. task.setPriority(1);
  9. task.setExecuteTime(new Date().getTime());
  10. cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
  11. }
  12. System.out.println("耗时"+(System.currentTimeMillis()- start));
  13. }
  14. @Test
  15. public void testPiple2(){
  16. long start = System.currentTimeMillis();
  17. //使用管道技术
  18. List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
  19. @Nullable
  20. @Override
  21. public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
  22. for (int i = 0; i <10000 ; i++) {
  23. Task task = new Task();
  24. task.setTaskType(1001);
  25. task.setPriority(1);
  26. task.setExecuteTime(new Date().getTime());
  27. redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
  28. }
  29. return null;
  30. }
  31. });
  32. System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
  33. }

9.使用定时任务刷新未来数据

此处用的单体定时任务,具体可根据项目使用分布式定时任务同步

在TaskService中添加方法

  1. @Scheduled(cron = "0 */1 * * * ?")
  2. public void refresh() {
  3. System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
  4. // 获取所有未来数据集合的key值
  5. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
  6. for (String futureKey : futureKeys) { // future_250_250
  7. String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
  8. //获取该组key下当前需要消费的任务数据
  9. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  10. if (!tasks.isEmpty()) {
  11. //将这些任务数据添加到消费者队列中
  12. cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
  13. System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
  14. }
  15. }
  16. }

 10.分布式锁解决集群下单体定时任务方法抢占执行

启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:  

redis分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

此加锁方法在cacheService工具类中

修改未来数据定时刷新的方法,如下:

  1. /**
  2. * 未来数据定时刷新
  3. */
  4. @Scheduled(cron = "0 */1 * * * ?")
  5. public void refresh(){
  6. String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
  7. if(StringUtils.isNotBlank(token)){
  8. log.info("未来数据定时刷新---定时任务");
  9. //获取所有未来数据的集合key
  10. Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
  11. for (String futureKey : futureKeys) {//future_100_50
  12. //获取当前数据的key topic
  13. String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
  14. //按照key和分值查询符合条件的数据
  15. Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
  16. //同步数据
  17. if(!tasks.isEmpty()){
  18. cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
  19. log.info("成功的将"+futureKey+"刷新到了"+topicKey);
  20. }
  21. }
  22. }
  23. }

10.数据库任务定时同步到redis

 第一步:

  1. @Scheduled(cron = "0 */5 * * * ?")
  2. @PostConstruct //只要微服务启动就会做一个同步
  3. public void reloadData() {
  4. clearCache();
  5. log.info("数据库数据同步到缓存");
  6. Calendar calendar = Calendar.getInstance();
  7. calendar.add(Calendar.MINUTE, 5);
  8. //查看小于未来5分钟的所有任务
  9. List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
  10. if(allTasks != null && allTasks.size() > 0){
  11. for (Taskinfo taskinfo : allTasks) {
  12. Task task = new Task();
  13. BeanUtils.copyProperties(taskinfo,task);
  14. task.setExecuteTime(taskinfo.getExecuteTime().getTime());
  15. addTaskToCache(task);
  16. }
  17. }
  18. }
  19. private void clearCache(){
  20. // 删除缓存中未来数据集合和当前消费者队列的所有key
  21. Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
  22. Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
  23. cacheService.delete(futurekeys);
  24. cacheService.delete(topickeys);
  25. }

11.延迟任务对外提供接口,提供远程fegin接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

  1. package com.heima.apis.schedule;
  2. import com.heima.model.common.dtos.ResponseResult;
  3. import com.heima.model.schedule.dtos.Task;
  4. import org.springframework.cloud.openfeign.FeignClient;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.PostMapping;
  8. import org.springframework.web.bind.annotation.RequestBody;
  9. @FeignClient("leadnews-schedule")
  10. public interface IScheduleClient {
  11. /**
  12. * 添加任务
  13. * @param task 任务对象
  14. * @return 任务id
  15. */
  16. @PostMapping("/api/v1/task/add")
  17. public ResponseResult addTask(@RequestBody Task task);
  18. /**
  19. * 取消任务
  20. * @param taskId 任务id
  21. * @return 取消结果
  22. */
  23. @GetMapping("/api/v1/task/cancel/{taskId}")
  24. public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
  25. /**
  26. * 按照类型和优先级来拉取任务
  27. * @param type
  28. * @param priority
  29. * @return
  30. */
  31. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  32. public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
  33. }

在heima-leadnews-schedule微服务下提供对应的实现

  1. package com.heima.schedule.feign;
  2. import com.heima.apis.schedule.IScheduleClient;
  3. import com.heima.model.common.dtos.ResponseResult;
  4. import com.heima.model.schedule.dtos.Task;
  5. import com.heima.schedule.service.TaskService;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.*;
  8. @RestController
  9. public class ScheduleClient implements IScheduleClient {
  10. @Autowired
  11. private TaskService taskService;
  12. /**
  13. * 添加任务
  14. * @param task 任务对象
  15. * @return 任务id
  16. */
  17. @PostMapping("/api/v1/task/add")
  18. @Override
  19. public ResponseResult addTask(@RequestBody Task task) {
  20. return ResponseResult.okResult(taskService.addTask(task));
  21. }
  22. /**
  23. * 取消任务
  24. * @param taskId 任务id
  25. * @return 取消结果
  26. */
  27. @GetMapping("/api/v1/task/cancel/{taskId}")
  28. @Override
  29. public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
  30. return ResponseResult.okResult(taskService.cancelTask(taskId));
  31. }
  32. /**
  33. * 按照类型和优先级来拉取任务
  34. * @param type
  35. * @param priority
  36. * @return
  37. */
  38. @GetMapping("/api/v1/task/poll/{type}/{priority}")
  39. @Override
  40. public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
  41. return ResponseResult.okResult(taskService.poll(type,priority));
  42. }
  43. }

12.发布文章集成添加延迟队列接口

在创建WmNewsTaskService

  1. package com.heima.wemedia.service;
  2. import com.heima.model.wemedia.pojos.WmNews;
  3. public interface WmNewsTaskService {
  4. /**
  5. * 添加任务到延迟队列中
  6. * @param id 文章的id
  7. * @param publishTime 发布的时间 可以做为任务的执行时间
  8. */
  9. public void addNewsToTask(Integer id, Date publishTime);
  10. }

实现:

  1. package com.heima.wemedia.service.impl;
  2. import com.heima.apis.schedule.IScheduleClient;
  3. import com.heima.model.common.enums.TaskTypeEnum;
  4. import com.heima.model.schedule.dtos.Task;
  5. import com.heima.model.wemedia.pojos.WmNews;
  6. import com.heima.utils.common.ProtostuffUtil;
  7. import com.heima.wemedia.service.WmNewsTaskService;
  8. import lombok.SneakyThrows;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.scheduling.annotation.Async;
  12. import org.springframework.stereotype.Service;
  13. @Service
  14. @Slf4j
  15. public class WmNewsTaskServiceImpl implements WmNewsTaskService {
  16. @Autowired
  17. private IScheduleClient scheduleClient;
  18. /**
  19. * 添加任务到延迟队列中
  20. * @param id 文章的id
  21. * @param publishTime 发布的时间 可以做为任务的执行时间
  22. */
  23. @Override
  24. @Async
  25. public void addNewsToTask(Integer id, Date publishTime) {
  26. log.info("添加任务到延迟服务中----begin");
  27. Task task = new Task();
  28. task.setExecuteTime(publishTime.getTime());
  29. task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
  30. task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  31. WmNews wmNews = new WmNews();
  32. wmNews.setId(id);
  33. task.setParameters(ProtostuffUtil.serialize(wmNews));
  34. scheduleClient.addTask(task);
  35. log.info("添加任务到延迟服务中----end");
  36. }
  37. }

枚举类:

  1. package com.heima.model.common.enums;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. @Getter
  5. @AllArgsConstructor
  6. public enum TaskTypeEnum {
  7. NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
  8. REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
  9. private final int taskType; //对应具体业务
  10. private final int priority; //业务不同级别
  11. private final String desc; //描述信息
  12. }

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组

  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

拷贝资料中的两个类到heima-leadnews-utils下

Protostuff需要引导依赖:

  1. <dependency>
  2. <groupId>io.protostuff</groupId>
  3. <artifactId>protostuff-core</artifactId>
  4. <version>1.6.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>io.protostuff</groupId>
  8. <artifactId>protostuff-runtime</artifactId>
  9. <version>1.6.0</version>
  10. </dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

  1. @Autowired
  2. private WmNewsTaskService wmNewsTaskService;
  3. /**
  4. * 发布修改文章或保存为草稿
  5. * @param dto
  6. * @return
  7. */
  8. @Override
  9. public ResponseResult submitNews(WmNewsDto dto) {
  10. //0.条件判断
  11. if(dto == null || dto.getContent() == null){
  12. return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
  13. }
  14. //1.保存或修改文章
  15. WmNews wmNews = new WmNews();
  16. //属性拷贝 属性名词和类型相同才能拷贝
  17. BeanUtils.copyProperties(dto,wmNews);
  18. //封面图片 list---> string
  19. if(dto.getImages() != null && dto.getImages().size() > 0){
  20. //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
  21. String imageStr = StringUtils.join(dto.getImages(), ",");
  22. wmNews.setImages(imageStr);
  23. }
  24. //如果当前封面类型为自动 -1
  25. if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
  26. wmNews.setType(null);
  27. }
  28. saveOrUpdateWmNews(wmNews);
  29. //2.判断是否为草稿 如果为草稿结束当前方法
  30. if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
  31. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  32. }
  33. //3.不是草稿,保存文章内容图片与素材的关系
  34. //获取到文章内容中的图片信息
  35. List<String> materials = ectractUrlInfo(dto.getContent());
  36. saveRelativeInfoForContent(materials,wmNews.getId());
  37. //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
  38. saveRelativeInfoForCover(dto,wmNews,materials);
  39. //审核文章
  40. // wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  41. wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
  42. return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
  43. }

消费任务进行审核文章

WmNewsTaskService中添加方法

  1. /**
  2. * 消费延迟队列数据
  3. */
  4. public void scanNewsByTask();

实现

  1. @Autowired
  2. private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
  3. /**
  4. * 消费延迟队列数据
  5. */
  6. @Scheduled(fixedRate = 1000)
  7. @Override
  8. @SneakyThrows
  9. public void scanNewsByTask() {
  10. log.info("文章审核---消费任务执行---begin---");
  11. ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
  12. if(responseResult.getCode().equals(200) && responseResult.getData() != null){
  13. String json_str = JSON.toJSONString(responseResult.getData());
  14. Task task = JSON.parseObject(json_str, Task.class);
  15. byte[] parameters = task.getParameters();
  16. WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
  17. System.out.println(wmNews.getId()+"-----------");
  18. wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
  19. }
  20. log.info("文章审核---消费任务执行---end---");
  21. }

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/125696?site
推荐阅读
相关标签
  

闽ICP备14008679号