Browse Source

活动订阅消息调整

cup 2 years ago
parent
commit
c1879c8b08

+ 96 - 23
mp-quartz/src/main/java/com/qs/mp/quartz/task/MarketingTask.java

@@ -6,7 +6,9 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.qs.mp.admin.domain.Marketing;
+import com.qs.mp.admin.domain.MarketingMsg;
 import com.qs.mp.admin.service.IMarketingAwardsService;
+import com.qs.mp.admin.service.IMarketingMsgService;
 import com.qs.mp.admin.service.IMarketingService;
 import com.qs.mp.common.core.redis.DistributedLocker;
 import com.qs.mp.common.core.redis.RedisCache;
@@ -20,10 +22,13 @@ import com.qs.mp.framework.redis.RedisLockKey;
 import com.qs.mp.framework.service.IWxSubscribeMessage;
 import com.qs.mp.user.domain.MarketingUserCode;
 import com.qs.mp.user.service.IMarketingUserCodeService;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
 import org.redisson.Redisson;
 import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopContext;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -33,6 +38,8 @@ import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.Assert;
 
 /**
  * 营销活动相关任务
@@ -56,6 +63,10 @@ public class MarketingTask {
     @Autowired
     private IMarketingUserCodeService marketingUserCodeService;
 
+    @Autowired
+    private IMarketingMsgService marketingMsgService;
+
+
     protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
 
 
@@ -82,17 +93,6 @@ public class MarketingTask {
             try {
                 // 开奖
                 marketingService.lottery(marketing);
-
-                // 获取所有的参与用户的用户id去重
-                List<MarketingUserCode> userCodeList = marketingUserCodeService.list(new LambdaQueryWrapper<MarketingUserCode>()
-                        .select(MarketingUserCode::getUserId)
-                        .eq(MarketingUserCode::getMarketingId, marketing.getId())
-                        .eq(MarketingUserCode::getUserType, UserTypeEnum.ORDINARY.getValue())
-                        .groupBy(MarketingUserCode::getUserId));
-                for (MarketingUserCode marketingUserCode : userCodeList) {
-                    // 发送开奖订阅通知
-                    wxSubscribeMessage.sendMarketingLottery(marketingUserCode.getUserId(), marketing);
-                }
             } catch (Exception e) {
                 LogUtil.error(logger, e, "活动开奖异常。marketingId:{0}", marketing.getId());
             } finally {
@@ -108,37 +108,68 @@ public class MarketingTask {
      * 发送活动开始通知
      */
     public void sendMessage() {
+        MarketingTask proxy = (MarketingTask) AopContext.currentProxy();
+        proxy.saveMarketingMsg();
+
+    }
+
+
+    @Transactional(rollbackFor = Exception.class)
+    public void saveMarketingMsg() {
         // 获取开始时间小于等于当前时间且未发开始通知的活动
         Date now = DateUtils.getNowDate();
         List<Marketing> marketingList = marketingService.list(new LambdaQueryWrapper<Marketing>()
-                .eq(Marketing::getTriggerStatus, 0)
-                .eq(Marketing::getIsOn, MarketingStatusEnum.ON.getValue())
-                .eq(Marketing::getIsSend, 0)
-                .le(Marketing::getStartTime, now));
+            .eq(Marketing::getTriggerStatus, 0)
+            .eq(Marketing::getIsOn, MarketingStatusEnum.ON.getValue())
+            .eq(Marketing::getIsSend, 0)
+            .le(Marketing::getStartTime, now));
         if (CollectionUtils.isEmpty(marketingList)) {
             return;
         }
 
         // 获取所有参与过活动的普通用户根据用户id去重
         List<MarketingUserCode> userCodeList = marketingUserCodeService.list(new LambdaQueryWrapper<MarketingUserCode>()
-                .select(MarketingUserCode::getUserId)
-                .eq(MarketingUserCode::getUserType, UserTypeEnum.ORDINARY.getValue())
-                .groupBy(MarketingUserCode::getUserId));
+            .select(MarketingUserCode::getUserId)
+            .eq(MarketingUserCode::getUserType, UserTypeEnum.ORDINARY.getValue())
+            .groupBy(MarketingUserCode::getUserId));
         if (CollectionUtils.isEmpty(userCodeList)) {
             return;
         }
 
+        List<Marketing> updateMarketingList = new ArrayList<>();
+        List<MarketingMsg> marketingMsgList = new ArrayList<>();
         for (Marketing marketing : marketingList) {
             for (MarketingUserCode marketingUserCode : userCodeList) {
-                // 发送开始订阅通知
-                wxSubscribeMessage.sendMarketingStart(marketingUserCode.getUserId(), marketing);
+
+                MarketingMsg marketingMsg = new MarketingMsg();
+                marketingMsg.setMarketingId(marketing.getId());
+                marketingMsg.setContext(marketing.getTitle());
+                marketingMsg.setType(2);
+                marketingMsg.setUserId(marketingUserCode.getUserId());
+
+                marketingMsgList.add(marketingMsg);
             }
 
-            // 更新活动通知发送状态
+            Marketing updateMarketing = new Marketing();
+            updateMarketing.setIsSend(1);
+            updateMarketing.setId(marketing.getId());
+            updateMarketingList.add(updateMarketing);
+
             marketingService.update(new LambdaUpdateWrapper<Marketing>()
-                    .set(Marketing::getIsSend, 1)
-                    .eq(Marketing::getId, marketing.getId()));
+                .set(Marketing::getIsSend, 1)
+                .eq(Marketing::getId, marketing.getId()));
+        }
+        if (CollectionUtils.isNotEmpty(marketingMsgList)) {
+            boolean rtn = marketingMsgService.saveBatch(marketingMsgList);
+            Assert.isTrue(rtn, "保存活动开始通知消息失败");
+        }
+
+        // 更新活动通知发送状态
+        if (CollectionUtils.isNotEmpty(updateMarketingList)) {
+            boolean rtn = marketingService.updateBatchById(updateMarketingList);
+            Assert.isTrue(rtn, "更新活动通知发送状态失败");
         }
+
     }
 
     /**
@@ -164,4 +195,46 @@ public class MarketingTask {
         }
     }
 
+    /**
+     * 定时发送一定数量的活动订阅消息
+     * @param limit
+     */
+    public void sendMsgByLimit(Integer limit) {
+
+        String lockKey = "MARKETING_SEND_MSG_LIMIT_KEY";
+        // 同时只能有一个线程发送活动通知任务,自动续期
+        if (!distributedLocker.tryLock(lockKey,0, -1, TimeUnit.SECONDS)) {
+            return;
+        }
+
+        try {
+            List<MarketingMsg> marketingMsgList = marketingMsgService.list(new LambdaQueryWrapper<MarketingMsg>()
+                .last("limit " + limit));
+
+            if (CollectionUtils.isEmpty(marketingMsgList)) {
+                return;
+            }
+
+            for (MarketingMsg marketingMsg : marketingMsgList) {
+                Marketing marketing = new Marketing();
+                marketing.setId(marketingMsg.getMarketingId());
+                marketing.setTitle(marketingMsg.getContext());
+                if (marketingMsg.getType() == 1) {
+                    // 发送活动开奖通知
+                    wxSubscribeMessage.sendMarketingLottery(marketingMsg.getUserId(), marketing);
+                } else {
+                    // 发送活动开始通知
+                    wxSubscribeMessage.sendMarketingStart(marketingMsg.getUserId(), marketing);
+                }
+            }
+
+            // 删除已发送的消息
+            marketingMsgService.removeByIds(marketingMsgList.stream().map(MarketingMsg::getId).collect(Collectors.toList()));
+        } catch (Exception e) {
+            LogUtil.error(logger, e, "发送活动订阅通知异常");
+        } finally {
+            distributedLocker.unlock(lockKey);
+        }
+    }
+
 }

+ 28 - 0
mp-service/src/main/java/com/qs/mp/admin/service/impl/MarketingServiceImpl.java

@@ -111,6 +111,9 @@ public class MarketingServiceImpl extends ServiceImpl<MarketingMapper, Marketing
     @Autowired
     private DistributedLocker distributedLocker;
 
+    @Autowired
+    private IMarketingMsgService marketingMsgService;
+
 
     @Override
     @Transactional(rollbackFor = Exception.class)
@@ -399,6 +402,31 @@ public class MarketingServiceImpl extends ServiceImpl<MarketingMapper, Marketing
         redisCache.deleteObject(userLowLotteryPool);
         redisCache.deleteObject(insideLotteryPool);
 
+
+
+        // 保存开奖消息
+        // 获取所有的参与用户的用户id去重
+        List<MarketingUserCode> sendMsgUserList = marketingUserCodeService.list(new LambdaQueryWrapper<MarketingUserCode>()
+            .select(MarketingUserCode::getUserId)
+            .eq(MarketingUserCode::getMarketingId, marketing.getId())
+            .eq(MarketingUserCode::getUserType, UserTypeEnum.ORDINARY.getValue())
+            .groupBy(MarketingUserCode::getUserId));
+        if (CollectionUtils.isEmpty(sendMsgUserList)) {
+            return;
+        }
+        List<MarketingMsg> marketingMsgList = new ArrayList<>();
+        for (MarketingUserCode marketingUserCode : sendMsgUserList) {
+            // 封装开奖订阅消息
+            MarketingMsg marketingMsg = new MarketingMsg();
+            marketingMsg.setType(1);
+            marketingMsg.setMarketingId(marketing.getId());
+            marketingMsg.setContext(marketing.getTitle());
+            marketingMsg.setUserId(marketingUserCode.getUserId());
+            marketingMsgList.add(marketingMsg);
+        }
+        rtn = marketingMsgService.saveBatch(marketingMsgList);
+        Assert.isTrue(rtn, "保存活动开奖消息失败。marketingId:" + marketing.getId());
+
     }
 
     private MarketingHitPrize exchangeMarketingHitPrize(Marketing marketing, List<MarketingHitPrize> allHitPrizeList, MarketingAwards marketingAwards, List<MarketingAwardsPrize> marketingAwardsPrizeList, MarketingUserCode marketingUserCode) {