|
@@ -0,0 +1,84 @@
|
|
|
+package com.qs.mp.quartz.task;
|
|
|
+
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Assert;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
+import com.qs.mp.common.core.redis.DistributedLocker;
|
|
|
+import com.qs.mp.common.enums.AsyncTaskTypeEnum;
|
|
|
+import com.qs.mp.common.utils.LogUtil;
|
|
|
+import com.qs.mp.framework.domain.AsyncTask;
|
|
|
+import com.qs.mp.framework.redis.RedisKey;
|
|
|
+import com.qs.mp.framework.redis.RedisLockKey;
|
|
|
+import com.qs.mp.framework.service.IAsyncTaskService;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 异步消息消费任务
|
|
|
+ * @author Cup
|
|
|
+ * @date 2022/6/7
|
|
|
+ */
|
|
|
+@Component("asyncConsumeTask")
|
|
|
+public class AsyncConsumeTask {
|
|
|
+
|
|
|
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IAsyncTaskService asyncTaskService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private DistributedLocker distributedLocker;
|
|
|
+
|
|
|
+ public void processingTask(){
|
|
|
+
|
|
|
+ List<AsyncTask> asyncTaskList = asyncTaskService.list(new LambdaQueryWrapper<AsyncTask>().eq(AsyncTask::getStatus, 0));
|
|
|
+ if (CollectionUtils.isEmpty(asyncTaskList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (AsyncTask asyncTask : asyncTaskList) {
|
|
|
+ String lockKey = RedisLockKey.build(RedisLockKey.ASYNC_TASK_KEY, asyncTask.getId());
|
|
|
+ // 加锁,自动续期
|
|
|
+ if (!distributedLocker.tryLock(lockKey,0,-1, TimeUnit.SECONDS)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ this.execute(asyncTask);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 更新异步任务执行次数
|
|
|
+ asyncTaskService.update(new LambdaUpdateWrapper<AsyncTask>()
|
|
|
+ .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1)
|
|
|
+ .eq(AsyncTask::getId, asyncTask.getId()));
|
|
|
+ LogUtil.error(logger, e, "异步任务处理异常。任务id:{0}", asyncTask.getId());
|
|
|
+ } finally {
|
|
|
+ // 释放锁
|
|
|
+ distributedLocker.unlock(lockKey);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void execute(AsyncTask asyncTask){
|
|
|
+ if (AsyncTaskTypeEnum.CHANNEL_CONFIRM_RECEIPT.getValue().equals(asyncTask.getType())) {
|
|
|
+ // 确认收货后相关任务处理
|
|
|
+ asyncTaskService.channelConfirmReceipt(asyncTask);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 更新异步任务状态
|
|
|
+ boolean rtn = asyncTaskService.update(new LambdaUpdateWrapper<AsyncTask>()
|
|
|
+ .set(AsyncTask::getOperateNum, asyncTask.getOperateNum() + 1)
|
|
|
+ .set(AsyncTask::getStatus, 1)
|
|
|
+ .eq(AsyncTask::getId, asyncTask.getId()));
|
|
|
+
|
|
|
+ Assert.isFalse(rtn,"异步任务状态更新失败, id:{0}", asyncTask.getId());
|
|
|
+ }
|
|
|
+
|
|
|
+}
|