AsyncConsumeTask.java 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package com.qs.mp.quartz.task;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  4. import com.baomidou.mybatisplus.core.toolkit.Assert;
  5. import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
  6. import com.qs.mp.common.core.redis.DistributedLocker;
  7. import com.qs.mp.common.enums.AsyncTaskTypeEnum;
  8. import com.qs.mp.common.utils.LogUtil;
  9. import com.qs.mp.framework.domain.AsyncTask;
  10. import com.qs.mp.framework.redis.RedisKey;
  11. import com.qs.mp.framework.redis.RedisLockKey;
  12. import com.qs.mp.framework.service.IAsyncTaskService;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.aop.framework.AopContext;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.stereotype.Component;
  18. import org.springframework.transaction.annotation.Transactional;
  19. import java.util.List;
  20. import java.util.concurrent.TimeUnit;
  21. import java.util.concurrent.locks.ReentrantLock;
  22. /**
  23. * 异步消息消费任务
  24. * @author Cup
  25. * @date 2022/6/7
  26. */
  27. @Component("asyncConsumeTask")
  28. public class AsyncConsumeTask {
  29. protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
  30. @Autowired
  31. private IAsyncTaskService asyncTaskService;
  32. @Autowired
  33. private DistributedLocker distributedLocker;
  34. public void processingTask(){
  35. List<AsyncTask> asyncTaskList = asyncTaskService.list(new LambdaQueryWrapper<AsyncTask>().eq(AsyncTask::getStatus, 0));
  36. if (CollectionUtils.isEmpty(asyncTaskList)) {
  37. return;
  38. }
  39. for (AsyncTask asyncTask : asyncTaskList) {
  40. String lockKey = RedisLockKey.build(RedisLockKey.ASYNC_TASK_KEY, asyncTask.getId());
  41. // 加锁,自动续期
  42. if (!distributedLocker.tryLock(lockKey,0,-1, TimeUnit.SECONDS)) {
  43. continue;
  44. }
  45. try {
  46. AsyncConsumeTask proxy = (AsyncConsumeTask)AopContext.currentProxy();
  47. proxy.execute(asyncTask);
  48. } catch (Exception e) {
  49. // 更新异步任务执行次数
  50. asyncTaskService.update(new LambdaUpdateWrapper<AsyncTask>()
  51. .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1)
  52. .eq(AsyncTask::getId, asyncTask.getId()));
  53. LogUtil.error(logger, e, "异步任务处理异常。任务id:{0}", asyncTask.getId());
  54. } finally {
  55. // 释放锁
  56. distributedLocker.unlock(lockKey);
  57. }
  58. }
  59. }
  60. @Transactional(rollbackFor = Exception.class)
  61. public void execute(AsyncTask asyncTask){
  62. if (AsyncTaskTypeEnum.CHANNEL_CONFIRM_RECEIPT.getValue().equals(asyncTask.getType())) {
  63. // 确认收货后相关任务处理
  64. asyncTaskService.channelConfirmReceipt(asyncTask);
  65. } else if (AsyncTaskTypeEnum.TICKET_PAY.getValue().equals(asyncTask.getType())) {
  66. // 盲票支付后相关任务处理
  67. asyncTaskService.ticketPay(asyncTask);
  68. } else if (AsyncTaskTypeEnum.TICKET_GENERATE.getValue().equals(asyncTask.getType())) {
  69. // 盲票创建后相关任务处理
  70. asyncTaskService.ticketGenerate(asyncTask);
  71. } else if (AsyncTaskTypeEnum.CARD_ORDER_DELIVER.getValue().equals(asyncTask.getType())) {
  72. // 卡密订单发货任务
  73. asyncTaskService.cardOrderDeliver(asyncTask);
  74. } else {
  75. // 都没匹配到则忽略不处理任务
  76. return;
  77. }
  78. // 更新异步任务状态
  79. boolean rtn = asyncTaskService.update(new LambdaUpdateWrapper<AsyncTask>()
  80. .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1)
  81. .set(AsyncTask::getStatus, 1)
  82. .eq(AsyncTask::getId, asyncTask.getId()));
  83. Assert.isTrue(rtn,"异步任务状态更新失败, id:{0}", asyncTask.getId());
  84. }
  85. }