浏览代码

异步任务改造

cup 2 年之前
父节点
当前提交
581ac8bf50

+ 0 - 5
mp-admin/src/main/java/com/qs/mp/web/controller/api/admin/TicketBoxMgrController.java

@@ -201,11 +201,6 @@ public class TicketBoxMgrController extends BaseApiController {
 		}
 		String boxId = ticketBoxService.createTicketBox(param);
 		if (StringUtils.isNotBlank(boxId)) {
-			try {
-				pulsarClientService.producer(MqTopicType.ticket_generate, boxId);
-			} catch (PulsarClientException e) {
-				LogUtil.error(logger, e, "盲票组保存成功,发送异步消息失败. {0}", boxId);
-			}
 			return AjaxResult.success();
 		}
 		return AjaxResult.error("创建失败");

+ 0 - 5
mp-admin/src/main/java/com/qs/mp/web/controller/api/user/UserTicketOrderController.java

@@ -177,11 +177,6 @@ public class UserTicketOrderController extends BaseApiController {
       jsonObject.put("needPay", 1);
     } else {
       jsonObject.put("needPay", 0);
-      try {
-          pulsarClientService.producer(MqTopicType.ticket_pay, orderId);
-      } catch (PulsarClientException e) {
-        LogUtil.error(logger, e, "盲票支付成功,发送异步消息失败. orderId:{0}", orderId);
-      }
     }
     // 清除缓存的订单
     redisCache.deleteObject(RedisKey.build(RedisKey.USER_TICKET_ORDER_KEY, userId));

+ 3 - 1
mp-common/src/main/java/com/qs/mp/common/enums/AsyncTaskTypeEnum.java

@@ -11,7 +11,9 @@ import io.swagger.annotations.ApiModel;
  */
 @ApiModel("异步任务类型枚举")
 public enum AsyncTaskTypeEnum implements IEnum<Integer> {
-    CHANNEL_CONFIRM_RECEIPT(1,"经销商确认收货任务");
+    CHANNEL_CONFIRM_RECEIPT(1,"经销商确认收货任务"),
+    TICKET_PAY(2,"盲票付款任务"),
+    TICKET_GENERATE(3, "盲票生成任务");
 
     private Integer value;
     private String desc;

+ 6 - 0
mp-quartz/src/main/java/com/qs/mp/quartz/task/AsyncConsumeTask.java

@@ -72,6 +72,12 @@ public class AsyncConsumeTask {
         if (AsyncTaskTypeEnum.CHANNEL_CONFIRM_RECEIPT.getValue().equals(asyncTask.getType())) {
             // 确认收货后相关任务处理
             asyncTaskService.channelConfirmReceipt(asyncTask);
+        } else if (AsyncTaskTypeEnum.TICKET_PAY.getValue().equals(asyncTask.getType())) {
+            // 盲票支付后相关任务处理
+            asyncTaskService.ticketPay(asyncTask);
+        } else if (AsyncTaskTypeEnum.TICKET_GENERATE.getValue().equals(asyncTask.getType())) {
+            // 盲票支付后相关任务处理
+            asyncTaskService.ticketGenerate(asyncTask);
         }
 
         // 更新异步任务状态

+ 1 - 0
mp-quartz/src/main/java/com/qs/mp/quartz/task/OperateToolTask.java

@@ -3,6 +3,7 @@ package com.qs.mp.quartz.task;
 import com.qs.mp.common.enums.MqTopicType;
 import com.qs.mp.common.pulsar.PulsarClientService;
 import com.qs.mp.common.utils.LogUtil;
+import com.qs.mp.framework.service.IAsyncTaskService;
 import com.qs.mp.pay.service.IWalletService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 12 - 2
mp-service/src/main/java/com/qs/mp/admin/service/impl/TicketBoxServiceImpl.java

@@ -23,6 +23,7 @@ import com.qs.mp.common.pulsar.PulsarClientService;
 import com.qs.mp.common.utils.LogUtil;
 import com.qs.mp.common.utils.RSAUtil;
 import com.qs.mp.common.utils.StringUtils;
+import com.qs.mp.framework.service.IAsyncTaskService;
 import com.qs.mp.system.service.id.BizIdGenerator;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
@@ -90,6 +91,9 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
   @Autowired
   private IChannelService channelService;
 
+  @Autowired
+  private IAsyncTaskService asyncTaskService;
+
 
   @Override
   public List<TicketBox> listBySaleChannel(QueryWrapper<TicketBox> queryWrapper) {
@@ -401,11 +405,17 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     }
     ticketAwardsService.saveBatch(ticketAwardsList);
     ticketAwardsPrizeService.saveBatch(awardsPrizeList);
+
+    if (StringUtils.isNotBlank(ticketBox.getBoxId())) {
+      // 插入盲票生成异步任务
+      Assert.isTrue(asyncTaskService.insertAsyncTask(AsyncTaskTypeEnum.TICKET_GENERATE, ticketBox.getBoxId()),"盲票组保存,创建异步任务失败. boxId:" +  ticketBox.getBoxId());
+    }
+
     return ticketBox.getBoxId();
   }
 
   @Override
-  @Transactional
+  @Transactional(rollbackFor = Exception.class)
   public void removeTicketBox(String boxId) {
     removeById(boxId);
     ticketPackageService.remove(new LambdaQueryWrapper<TicketPackage>().eq(TicketPackage::getBoxId, boxId));
@@ -415,7 +425,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
   }
 
   @Override
-  @Transactional
+  @Transactional(rollbackFor = Exception.class)
   public void generateTicket(String boxId) {
     TicketBox ticketBox = getById(boxId);
     Assert.isTrue(ticketBox.getStatus() == TicketBoxStatusEnum.DOING,

+ 21 - 0
mp-service/src/main/java/com/qs/mp/framework/service/IAsyncTaskService.java

@@ -1,5 +1,6 @@
 package com.qs.mp.framework.service;
 
+import com.qs.mp.common.enums.AsyncTaskTypeEnum;
 import com.qs.mp.framework.domain.AsyncTask;
 import com.baomidou.mybatisplus.extension.service.IService;
 
@@ -13,9 +14,29 @@ import com.baomidou.mybatisplus.extension.service.IService;
  */
 public interface IAsyncTaskService extends IService<AsyncTask> {
 
+    /**
+     * 插入异步任务
+     * @param taskType
+     * @param bizId
+     * @return
+     */
+    boolean insertAsyncTask(AsyncTaskTypeEnum taskType, String bizId);
+
     /**
      * 经销商确认收货后相关任务处理
      * @param asyncTask
      */
     void channelConfirmReceipt(AsyncTask asyncTask);
+
+    /**
+     * 盲票支付相关任务处理
+     * @param asyncTask
+     */
+    void ticketPay(AsyncTask asyncTask);
+
+    /**
+     * 盲票生成相关任务处理
+     * @param asyncTask
+     */
+    void ticketGenerate(AsyncTask asyncTask);
 }

+ 112 - 2
mp-service/src/main/java/com/qs/mp/framework/service/impl/AsyncTaskServiceImpl.java

@@ -1,21 +1,34 @@
 package com.qs.mp.framework.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.qs.mp.admin.domain.Ticket;
+import com.qs.mp.admin.domain.TicketBox;
+import com.qs.mp.admin.domain.TicketPackage;
+import com.qs.mp.admin.service.ITicketBoxService;
+import com.qs.mp.admin.service.ITicketPackageService;
+import com.qs.mp.admin.service.ITicketService;
 import com.qs.mp.channel.domain.ChannelOrder;
 import com.qs.mp.channel.service.IChannelOrderService;
 import com.qs.mp.channel.service.IChannelUserRelService;
-import com.qs.mp.common.enums.ChannelCommissionResourceEnum;
-import com.qs.mp.common.enums.ChannelOrderTypeEnum;
+import com.qs.mp.common.enums.*;
 import com.qs.mp.common.exception.ServiceException;
+import com.qs.mp.common.utils.LogUtil;
 import com.qs.mp.framework.domain.AsyncTask;
 import com.qs.mp.framework.mapper.AsyncTaskMapper;
 import com.qs.mp.framework.service.IAsyncTaskService;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.qs.mp.user.domain.UserTicketOrder;
+import com.qs.mp.user.domain.UserTicketOrderItem;
+import com.qs.mp.user.service.IUserTicketOrderItemService;
 import com.qs.mp.user.service.IUserTicketOrderService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -29,12 +42,41 @@ import java.util.Objects;
 @Service
 public class AsyncTaskServiceImpl extends ServiceImpl<AsyncTaskMapper, AsyncTask> implements IAsyncTaskService {
 
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
+
     @Autowired
     private IChannelOrderService channelOrderService;
 
     @Autowired
     private IUserTicketOrderService userTicketService;;
 
+    @Autowired
+    private IUserTicketOrderService userTicketOrderService;
+
+    @Autowired
+    private ITicketBoxService ticketBoxService;
+
+    @Autowired
+    private IUserTicketOrderItemService userTicketOrderItemService;
+
+    @Autowired
+    private ITicketService ticketService;
+
+    @Autowired
+    private ITicketPackageService ticketPackageService;
+
+    @Autowired
+    private IChannelUserRelService channelUserRelService;
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public boolean insertAsyncTask(AsyncTaskTypeEnum taskType, String bizId) {
+        AsyncTask asyncTask = new AsyncTask();
+        asyncTask.setType(taskType.getValue());
+        asyncTask.setBizId(bizId);
+        return this.save(asyncTask);
+    }
+
     @Override
     @Transactional(rollbackFor = Exception.class)
     public void channelConfirmReceipt(AsyncTask asyncTask) {
@@ -54,4 +96,72 @@ public class AsyncTaskServiceImpl extends ServiceImpl<AsyncTaskMapper, AsyncTask
         // 分佣
         userTicketService.commToChannel(orderId, ChannelCommissionResourceEnum.CHANNEL.getValue());
     }
+
+    @Override
+    public void ticketPay(AsyncTask asyncTask) {
+        String orderId = asyncTask.getBizId();
+        UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
+
+        TicketBox ticketBox = ticketBoxService.getById(ticketOrder.getBoxId());
+        if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
+            // 线下票更新销量,此处不做乐观锁控制,因为不用控制库存
+            ticketBoxService.update(
+                    new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getSaleQty,
+                                    ticketBox.getSaleQty() + 1)
+                            .eq(TicketBox::getBoxId, ticketBox.getBoxId()));
+        }
+
+
+        // 更新票组销售数量,此处只做累计,允许并发容错
+        List<UserTicketOrderItem> ticketOrderItemList = userTicketOrderItemService.list(new LambdaQueryWrapper<UserTicketOrderItem>()
+                .eq(UserTicketOrderItem::getOrderId, orderId));
+        for (UserTicketOrderItem orderItem : ticketOrderItemList) {
+            Ticket ticket = ticketService.getById(orderItem.getTicketId());
+            TicketPackage ticketPackage = ticketPackageService.getById(ticket.getPkgId());
+            ticketPackageService.update(new LambdaUpdateWrapper<TicketPackage>()
+                    .set(TicketPkgSaleStatusEnum.WAIT_SALE == ticketPackage.getSaleStatus(),
+                            TicketPackage::getSaleStatus, TicketPkgSaleStatusEnum.ON_SALE)
+                    .set(TicketPkgSaleStatusEnum.ON_SALE == ticketPackage.getSaleStatus()
+                                    && ticketPackage.getSaleQty() + 1 >= ticketPackage.getPkgUnit(),
+                            TicketPackage::getSaleStatus, TicketPkgSaleStatusEnum.SALE_OUT)
+                    .set(TicketPackage::getSaleQty, ticketPackage.getSaleQty() + 1)
+                    .eq(TicketPackage::getPkgId, ticketPackage.getPkgId()));
+        }
+
+        // 用户关系绑定
+        channelUserRelService.bindUser(ticketOrder.getUserId(), ticketOrder.getChannelId());
+
+        if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
+            LogUtil.warn(logger, "收到盲票支付成功任务,订单结佣状态不是未结佣,忽略任务。orderId=" + orderId);
+            return;
+        }
+        // 先更新状态,防并发。
+        boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
+                .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
+        if (!rst) {
+            LogUtil.error(logger, "收到盲票支付成功任务,更新订单结佣状态为结佣中失败。orderId=" + orderId);
+            return;
+        }
+        userTicketOrderService.commToChannel(orderId, ChannelCommissionResourceEnum.USER.getValue());
+
+    }
+
+
+    @Override
+    public void ticketGenerate(AsyncTask asyncTask) {
+        String boxId = asyncTask.getBizId();
+        TicketBox ticketBox = ticketBoxService.getById(boxId);
+        if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
+            LogUtil.error(logger, "收到盲票生成任务,票组状态不是待出票,忽略任务。boxId=" + boxId);
+            return;
+        }
+        // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
+        boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
+                .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
+        if (!rst) {
+            LogUtil.error(logger, "收到盲票生成任务,更新票组状态为出票中失败。boxId=" + boxId);
+            return;
+        }
+        ticketBoxService.generateTicket(boxId);
+    }
 }

+ 7 - 8
mp-service/src/main/java/com/qs/mp/pay/service/impl/WalletServiceImpl.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.qs.mp.channel.service.IChannelOrderService;
+import com.qs.mp.common.enums.AsyncTaskTypeEnum;
 import com.qs.mp.common.enums.BizTypeEnum;
 import com.qs.mp.common.enums.MqTopicType;
 import com.qs.mp.common.enums.PayOrderStatusEnum;
@@ -13,6 +14,8 @@ import com.qs.mp.common.utils.DateUtils;
 import com.qs.mp.common.utils.LogUtil;
 import com.qs.mp.common.utils.StringUtils;
 import com.qs.mp.common.utils.http.OkHttpUtil;
+import com.qs.mp.framework.domain.AsyncTask;
+import com.qs.mp.framework.service.IAsyncTaskService;
 import com.qs.mp.pay.domain.PayOrder;
 import com.qs.mp.pay.service.IPayOrderService;
 import com.qs.mp.pay.service.IWalletService;
@@ -56,6 +59,9 @@ public class WalletServiceImpl implements IWalletService {
   @Autowired
   private BizIdGenerator bizIdGenerator;
 
+  @Autowired
+  private IAsyncTaskService asyncTaskService;
+
   @Value("${pay.notifyUrl}")
   private String notifyUrl;  //支付成功前端跳转地址
   @Value("${pay.callbackUrl}")
@@ -186,14 +192,7 @@ public class WalletServiceImpl implements IWalletService {
       channelOrderService.paySuccess(order);
     } else if (BizTypeEnum.TICKET_ORDER == order.getBizType()) {
       // 用户盲票购买订单支付成功,调用业务订单服务处理
-      boolean rst = userTicketOrderService.paySuccess(order);
-      try {
-        if (rst) {
-          pulsarClientService.producer(MqTopicType.ticket_pay, order.getBizId());
-        }
-      } catch (PulsarClientException e) {
-        LogUtil.error(logger, e, "盲票支付成功,发送异步消息失败. orderId:{0}", order.getBizId());
-      }
+      userTicketOrderService.paySuccess(order);
     } else if (BizTypeEnum.DELIVER_ORDER == order.getBizType()) {
       // 用户提货订单支付成功,调用业务订单服务处理
       userDeliverOrderService.paySuccess(order);

+ 14 - 1
mp-service/src/main/java/com/qs/mp/user/service/impl/UserTicketOrderServiceImpl.java

@@ -27,6 +27,7 @@ import com.qs.mp.common.utils.LogUtil;
 import com.qs.mp.common.utils.RSAUtil;
 import com.qs.mp.common.utils.StringUtils;
 import com.qs.mp.framework.redis.RedisKey;
+import com.qs.mp.framework.service.IAsyncTaskService;
 import com.qs.mp.pay.domain.PayOrder;
 import com.qs.mp.system.service.id.BizIdGenerator;
 import com.qs.mp.user.domain.UserCoupon;
@@ -110,6 +111,9 @@ public class UserTicketOrderServiceImpl extends
   @Autowired
   private RedisCache redisCache;
 
+  @Autowired
+  private IAsyncTaskService asyncTaskService;
+
   @Override
   @Transactional
   public String submitOrder(Long userId, TicketOrderSettleVO orderSettleVO,
@@ -212,6 +216,8 @@ public class UserTicketOrderServiceImpl extends
     if (orderSettleVO.getPayAmt() == 0) {
       // 无需支付的,直接置为成功
       processTicketOrder(userTicketOrder);
+      // 插入付款异步任务
+      Assert.isTrue(asyncTaskService.insertAsyncTask(AsyncTaskTypeEnum.TICKET_PAY, userTicketOrder.getOrderId()),"盲票支付,创建异步任务失败:" + userTicketOrder.getOrderId());
     }
 
     return userTicketOrder.getOrderId();
@@ -278,6 +284,10 @@ public class UserTicketOrderServiceImpl extends
     if (orderSettleVO.getPayAmt() == 0) {
       // 无需支付的,直接置为成功
       processTicketOrder(ticketOrder);
+
+      // 插入付款异步任务
+      Assert.isTrue(asyncTaskService.insertAsyncTask(AsyncTaskTypeEnum.TICKET_PAY, ticketOrder.getOrderId()),"盲票支付,创建异步任务失败:" + ticketOrder.getOrderId());
+
     }
     return ticketOrder.getOrderId();
   }
@@ -373,7 +383,7 @@ public class UserTicketOrderServiceImpl extends
   }
 
   @Override
-  @Transactional
+  @Transactional(rollbackFor = Exception.class)
   public boolean paySuccess(PayOrder payOrder) {
     UserTicketOrder ticketOrder = getById(payOrder.getBizId());
     if (null == ticketOrder || ticketOrder.getStatus() != UserTicketOrderStatusEnum.NOT_PAY) {
@@ -382,6 +392,9 @@ public class UserTicketOrderServiceImpl extends
       throw new ServiceException("支付回调用户购票订单处理,订单状态异常,不是待支付。orderId:" + payOrder.getBizId());
     }
     ticketOrder.setPayTime(DateUtils.parseDate(payOrder.getCompleteDate()));
+
+    // 插入异步任务
+    Assert.isTrue(asyncTaskService.insertAsyncTask(AsyncTaskTypeEnum.TICKET_PAY, payOrder.getOrderId()), "创建支付成功异步任务失败. orderId:" + payOrder.getOrderId());
     return processTicketOrder(ticketOrder);
   }