1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package com.qs.mp.quartz.task;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Assert;
- import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
- import com.qs.mp.common.core.redis.DistributedLocker;
- import com.qs.mp.common.enums.AsyncTaskTypeEnum;
- import com.qs.mp.common.utils.LogUtil;
- import com.qs.mp.framework.domain.AsyncTask;
- import com.qs.mp.framework.redis.RedisKey;
- import com.qs.mp.framework.redis.RedisLockKey;
- import com.qs.mp.framework.service.IAsyncTaskService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.aop.framework.AopContext;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- * 异步消息消费任务
- * @author Cup
- * @date 2022/6/7
- */
- @Component("asyncConsumeTask")
- public class AsyncConsumeTask {
- protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
- @Autowired
- private IAsyncTaskService asyncTaskService;
- @Autowired
- private DistributedLocker distributedLocker;
- public void processingTask(){
- List<AsyncTask> asyncTaskList = asyncTaskService.list(new LambdaQueryWrapper<AsyncTask>().eq(AsyncTask::getStatus, 0));
- if (CollectionUtils.isEmpty(asyncTaskList)) {
- return;
- }
- for (AsyncTask asyncTask : asyncTaskList) {
- String lockKey = RedisLockKey.build(RedisLockKey.ASYNC_TASK_KEY, asyncTask.getId());
- // 加锁,自动续期
- if (!distributedLocker.tryLock(lockKey,0,-1, TimeUnit.SECONDS)) {
- continue;
- }
- try {
- AsyncConsumeTask proxy = (AsyncConsumeTask)AopContext.currentProxy();
- proxy.execute(asyncTask);
- } catch (Exception e) {
- // 更新异步任务执行次数
- asyncTaskService.update(new LambdaUpdateWrapper<AsyncTask>()
- .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1)
- .eq(AsyncTask::getId, asyncTask.getId()));
- LogUtil.error(logger, e, "异步任务处理异常。任务id:{0}", asyncTask.getId());
- } finally {
- // 释放锁
- distributedLocker.unlock(lockKey);
- }
- }
- }
- @Transactional(rollbackFor = Exception.class)
- public void execute(AsyncTask asyncTask){
- if (AsyncTaskTypeEnum.CHANNEL_CONFIRM_RECEIPT.getValue().equals(asyncTask.getType())) {
- // 确认收货后相关任务处理
- asyncTaskService.channelConfirmReceipt(asyncTask);
- } else if (AsyncTaskTypeEnum.TICKET_PAY.getValue().equals(asyncTask.getType())) {
- // 盲票支付后相关任务处理
- asyncTaskService.ticketPay(asyncTask);
- } else if (AsyncTaskTypeEnum.TICKET_GENERATE.getValue().equals(asyncTask.getType())) {
- // 盲票创建后相关任务处理
- asyncTaskService.ticketGenerate(asyncTask);
- } else if (AsyncTaskTypeEnum.CARD_ORDER_DELIVER.getValue().equals(asyncTask.getType())) {
- // 卡密订单发货任务
- asyncTaskService.cardOrderDeliver(asyncTask);
- } else {
- // 都没匹配到则忽略不处理任务
- return;
- }
- // 更新异步任务状态
- boolean rtn = asyncTaskService.update(new LambdaUpdateWrapper<AsyncTask>()
- .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1)
- .set(AsyncTask::getStatus, 1)
- .eq(AsyncTask::getId, asyncTask.getId()));
- Assert.isTrue(rtn,"异步任务状态更新失败, id:{0}", asyncTask.getId());
- }
- }
|