package com.qs.mp.quartz.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Assert; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.qs.mp.common.core.redis.DistributedLocker; import com.qs.mp.common.enums.AsyncTaskTypeEnum; import com.qs.mp.common.utils.LogUtil; import com.qs.mp.framework.domain.AsyncTask; import com.qs.mp.framework.redis.RedisKey; import com.qs.mp.framework.redis.RedisLockKey; import com.qs.mp.framework.service.IAsyncTaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * 异步消息消费任务 * @author Cup * @date 2022/6/7 */ @Component("asyncConsumeTask") public class AsyncConsumeTask { protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName()); @Autowired private IAsyncTaskService asyncTaskService; @Autowired private DistributedLocker distributedLocker; public void processingTask(){ List asyncTaskList = asyncTaskService.list(new LambdaQueryWrapper().eq(AsyncTask::getStatus, 0)); if (CollectionUtils.isEmpty(asyncTaskList)) { return; } for (AsyncTask asyncTask : asyncTaskList) { String lockKey = RedisLockKey.build(RedisLockKey.ASYNC_TASK_KEY, asyncTask.getId()); // 加锁,自动续期 if (!distributedLocker.tryLock(lockKey,0,-1, TimeUnit.SECONDS)) { continue; } try { AsyncConsumeTask proxy = (AsyncConsumeTask)AopContext.currentProxy(); proxy.execute(asyncTask); } catch (Exception e) { // 更新异步任务执行次数 asyncTaskService.update(new LambdaUpdateWrapper() .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1) .eq(AsyncTask::getId, asyncTask.getId())); LogUtil.error(logger, e, "异步任务处理异常。任务id:{0}", asyncTask.getId()); } finally { // 释放锁 distributedLocker.unlock(lockKey); } } } @Transactional(rollbackFor = Exception.class) public void execute(AsyncTask asyncTask){ if (AsyncTaskTypeEnum.CHANNEL_CONFIRM_RECEIPT.getValue().equals(asyncTask.getType())) { // 确认收货后相关任务处理 asyncTaskService.channelConfirmReceipt(asyncTask); } else if (AsyncTaskTypeEnum.TICKET_PAY.getValue().equals(asyncTask.getType())) { // 盲票支付后相关任务处理 asyncTaskService.ticketPay(asyncTask); } else if (AsyncTaskTypeEnum.TICKET_GENERATE.getValue().equals(asyncTask.getType())) { // 盲票创建后相关任务处理 asyncTaskService.ticketGenerate(asyncTask); } else if (AsyncTaskTypeEnum.CARD_ORDER_DELIVER.getValue().equals(asyncTask.getType())) { // 卡密订单发货任务 asyncTaskService.cardOrderDeliver(asyncTask); } else { // 都没匹配到则忽略不处理任务 return; } // 更新异步任务状态 boolean rtn = asyncTaskService.update(new LambdaUpdateWrapper() .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1) .set(AsyncTask::getStatus, 1) .eq(AsyncTask::getId, asyncTask.getId())); Assert.isTrue(rtn,"异步任务状态更新失败, id:{0}", asyncTask.getId()); } }