Browse Source

Merge branch 'dev' into 'mp-server-test'

Dev

See merge request quanshu/mp-server!92
zhong chunping 3 năm trước cách đây
mục cha
commit
0db604210f

+ 7 - 0
mp-admin/src/main/resources/application-dev.yml

@@ -106,6 +106,13 @@ cloud:
     # 存储对象公开不可访问,需要通过应用下载
     private-bucket-name: mp-auth-test-1307117429
 
+mq:
+    consumer-conn: false
+    service-url: http://pulsar-9nbo99zrnxzb.tdmq-pulsar.ap-sh.qcloud.tencenttdmq.com:5039
+    auth-token: eyJrZXlJZCI6InB1bHNhci05bmJvOTl6cm54emIiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItOW5ibzk5enJueHpiX3lncC1zZXJ2ZXIifQ.RO0faVqgSfcc6RkS3Tctv2sAPe2Wp26lVZK34BtdhBU
+    topic-ticket-generate: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+    consumer-topics: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+
 # 小程序
 miniprogram:
     # 接收通知的状态

+ 8 - 0
mp-admin/src/main/resources/application-test.yml

@@ -111,6 +111,14 @@ cloud:
   # 存储对象公开不可访问,需要通过应用下载
   private-bucket-name: mp-auth-test-1307117429
 
+mq:
+  consumer-conn: true
+  service-url: http://pulsar-9nbo99zrnxzb.tdmq-pulsar.ap-sh.qcloud.tencenttdmq.com:5039
+  auth-token: eyJrZXlJZCI6InB1bHNhci05bmJvOTl6cm54emIiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItOW5ibzk5enJueHpiX3lncC1zZXJ2ZXIifQ.RO0faVqgSfcc6RkS3Tctv2sAPe2Wp26lVZK34BtdhBU
+  topic-ticket-generate: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+  consumer-topics: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+
+
 # 小程序
 miniprogram:
   # 接收通知的状态

+ 13 - 2
mp-admin/src/test/java/com/qs/mp/service/ServiceImplTest.java

@@ -36,7 +36,7 @@ public class ServiceImplTest {
 
 
   public static void main(String[] args) {
-    int n = 500;
+    /*int n = 500;
     Random rand = new Random();
     boolean[] bool = new boolean[n];
     int randInt = 0;
@@ -47,5 +47,16 @@ public class ServiceImplTest {
       bool[randInt] = true;
       System.out.println(randInt);
     }
+  */
+    String s = "z";
+
+    int i = Character.codePointAt(s, 0);
+
+    System.out.println(i + "");
+
+    char[] a = Character.toChars(123);
+
+    System.out.println(new String(a));
+  }
+
   }
-}

+ 4 - 0
mp-common/pom.xml

@@ -59,6 +59,10 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+        </dependency>
         <!-- JSON工具类 -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>

+ 27 - 0
mp-common/src/main/java/com/qs/mp/common/enums/MqTopicType.java

@@ -0,0 +1,27 @@
+package com.qs.mp.common.enums;
+
+import com.baomidou.mybatisplus.annotation.IEnum;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/19 5:28 下午
+ * @describe
+ */
+public enum MqTopicType implements IEnum<String> {
+
+  ticket_generate("1", "盲票生成");
+
+
+  private final String value;
+  private final String desc;
+
+  MqTopicType(final String value, final String desc) {
+    this.value = value;
+    this.desc = desc;
+  }
+
+  @Override
+  public String getValue() {
+    return value;
+  }
+}

+ 4 - 0
mp-common/src/main/java/com/qs/mp/common/enums/TicketBoxStatusEnum.java

@@ -10,6 +10,10 @@ import com.baomidou.mybatisplus.annotation.IEnum;
  */
 public enum TicketBoxStatusEnum implements IEnum<String> {
 
+
+  WAIT("wait", "待出票"),
+  DOING("doing", "出票中"),
+  DONE("done", "待上架"),
   PUT_ON("on", "上架"),
   PUT_OFF("off", "下架");
 

+ 171 - 0
mp-common/src/main/java/com/qs/mp/common/pulsar/PulsarClientService.java

@@ -0,0 +1,171 @@
+package com.qs.mp.common.pulsar;
+
+import com.qs.mp.common.enums.MqTopicType;
+import com.qs.mp.common.utils.StringUtils;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PostConstruct;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/19 2:27 下午
+ * @describe
+ */
+@Component
+public class PulsarClientService {
+
+  protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
+  private PulsarClient client;
+  private Consumer<byte[]> consumer;
+
+  @Value("${mq.service-url}")
+  private String serviceUrl;
+
+  @Value("${mq.auth-token}")
+  private String authToken;
+
+  @Value("${mq.topic-ticket-generate}")
+  private String topicTicketGenerate;
+
+  @Value("${mq.consumer-conn}")
+  private boolean consumerConn;
+
+  @Value("${mq.consumer-topics}")
+  private String topics;
+
+  @Autowired
+  private PulsarConsumer pulsarConsumer;
+
+  private final ConcurrentHashMap<String, Producer<byte[]>> Producers = new ConcurrentHashMap<String, Producer<byte[]>>();
+
+  @PostConstruct
+  public void init() throws PulsarClientException {
+    client = PulsarClient.builder()
+        .serviceUrl(serviceUrl)//【集群管理】接入地址处复制
+        .authentication(AuthenticationFactory.token(authToken))
+        .build();
+
+    if (consumerConn) {
+      consumer = client.newConsumer()
+          .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
+          .subscriptionName("ygp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
+          .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
+          .subscriptionInitialPosition(
+              SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
+          .subscribe();
+      new Thread(() -> {
+        try {
+          loop();
+        } catch (Exception e) {
+          logger.error("消费Pulsar数据异常,停止Pulsar连接:", e);
+          close();
+        }
+      }).start();
+    }
+  }
+
+  private void loop() throws Exception {
+    //消费消息
+    while (true) {
+      Message message = consumer.receive();
+      String[] keyArr = message.getKey().split("_");
+      String topicName = message.getTopicName();
+      String key = message.getKey();
+      String jsons = new String(message.getData());
+      if (!StringUtils.isBlank(jsons)) {
+        try {
+          pulsarConsumer.wsConsumer(key, jsons);
+          logger.info("receive data>>>>>>" + jsons);
+        } catch (Exception e) {
+          logger.error("消费Pulsar数据异常,key【{}】,json【{}】:", message.getKey(), jsons, e);
+        }
+      }
+      consumer.acknowledge(message);
+    }
+  }
+
+  /**
+   * @param mqTopicType
+   * @param data        内容为json格式
+   * @throws PulsarClientException
+   */
+  public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException {
+    logger.info("start producer mq data:" + data);
+    String topic = "";
+    if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) {
+      topic = topicTicketGenerate; //
+    }
+    Producer<byte[]> producer = null;
+    if (Producers.containsKey(mqTopicType.getValue())) {
+      producer = Producers.get(mqTopicType.getValue());
+    } else {
+      producer = client.newProducer()
+          .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
+          .enableBatching(true)
+          .create();
+      Producers.put(mqTopicType.getValue(), producer);
+    }
+
+    /*
+    producer.newMessage()//发送消息
+        .key(mqTopicType.getValue())
+        .value(data.getBytes())
+            .send();
+
+
+     */
+    CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
+        .key(mqTopicType.getValue())
+        .value(data.getBytes())
+        .sendAsync();
+    Producer<byte[]> finalProducer = producer;
+    messageIdFuture.whenComplete(((messageId, throwable) -> {
+      if (null != throwable) {
+        logger.error("【消息投递异常,开始重试】", throwable);
+        //todo 失败重试策略
+        try {
+          finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS
+          ).key(mqTopicType.getValue()).value(data.getBytes()).send();
+        } catch (PulsarClientException e) {
+          logger.error("重试投递失败", e);
+          //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性
+        }
+      } else {
+        logger.info("【消息成功投递】");
+      }
+    }));
+
+  }
+
+  public void consumer() throws PulsarClientException {
+
+  }
+
+  public void close() {
+    try {
+      consumer.close();
+    } catch (PulsarClientException e) {
+      logger.error("关闭Pulsar消费者失败:", e);
+    }
+    try {
+      client.close();
+    } catch (PulsarClientException e) {
+      logger.error("关闭Pulsar连接失败:", e);
+    }
+  }
+}

+ 12 - 0
mp-common/src/main/java/com/qs/mp/common/pulsar/PulsarConsumer.java

@@ -0,0 +1,12 @@
+package com.qs.mp.common.pulsar;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/20 10:19 上午
+ * @describe
+ */
+public interface PulsarConsumer {
+
+  public void wsConsumer(String topicType,String mqData);
+
+}

+ 4 - 1
mp-service/src/main/java/com/qs/mp/admin/service/ITicketBoxSerialService.java

@@ -11,7 +11,10 @@ import com.qs.mp.common.enums.TicketTypeEnum;
 public interface ITicketBoxSerialService {
 
   /**
-   * 格式,共7位 T(票类型)+ 22(年份) + 00001(序号)
+   * 格式,共7位,分三段,票类型(1位)-年份(1位)-序号(5位)
+   * T(票类型,T代表通用票)
+   * H(年份后两位+50后的ascii码,从2022年可以用到2072年)
+   * 00001(序号,每年从年份数字开始,比如2022年,那起始序号为2022)
    *
    * @param ticketType
    * @return

+ 6 - 0
mp-service/src/main/java/com/qs/mp/admin/service/ITicketBoxService.java

@@ -20,4 +20,10 @@ public interface ITicketBoxService extends IService<TicketBox> {
    * @return
    */
   boolean createTicketBox(TicketBoxCreateParam param);
+
+  /**
+   * 生成盲票
+   * @param boxId
+   */
+  void generateTicket(String boxId);
 }

+ 6 - 4
mp-service/src/main/java/com/qs/mp/admin/service/impl/TicketBoxSerialServiceImpl.java

@@ -36,7 +36,8 @@ public class TicketBoxSerialServiceImpl implements ITicketBoxSerialService {
   @Override
   public String generateSerial(TicketTypeEnum ticketType) {
     //年份后两位
-    String prefix = sdf.format(new Date()).substring(2);
+    String strYear = sdf.format(new Date());
+    String prefix = strYear.substring(2);
 
     //如果日期前缀未过期,则序号自增
     //否则,将日期作为Key,1作为Value重置,并设置第二年0点过期
@@ -44,11 +45,12 @@ public class TicketBoxSerialServiceImpl implements ITicketBoxSerialService {
       redisTemplate.opsForValue().increment(prefix, 1);
 
     } else {
-      int start = 1;
+      int start = Integer.valueOf(strYear);
       List<TicketBox> ticketBoxList = ticketBoxMapper.selectList(new QueryWrapper<TicketBox>().orderByDesc("created_time").last("limit 1"));
       if (!CollectionUtils.isEmpty(ticketBoxList)) {
         String boxNo = ticketBoxList.get(0).getBoxNo();
-        int year = Integer.valueOf(boxNo.substring(1, 3));
+
+        int year = Character.codePointAt(boxNo.substring(1, 2), 0) - 50;
         // 同一年份,做累加
         if (year == Integer.valueOf(prefix)) {
           start = Integer.valueOf(boxNo.substring(3)) + 1;
@@ -58,7 +60,7 @@ public class TicketBoxSerialServiceImpl implements ITicketBoxSerialService {
       redisTemplate.expireAt(prefix, getNextYearDate());
 
     }
-    return "T" + prefix + String.format("%1$05d", redisTemplate.opsForValue().get(prefix));
+    return "T" + Character.toChars(Integer.parseInt(prefix) + 50) + String.format("%1$05d", redisTemplate.opsForValue().get(prefix));
   }
 
   // 获取第二年 1月 1日 00:00的时间

+ 99 - 60
mp-service/src/main/java/com/qs/mp/admin/service/impl/TicketBoxServiceImpl.java

@@ -1,6 +1,8 @@
 package com.qs.mp.admin.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.qs.mp.admin.domain.Coupon;
 import com.qs.mp.admin.domain.Goods;
@@ -22,11 +24,13 @@ import com.qs.mp.admin.service.ITicketBoxSerialService;
 import com.qs.mp.admin.service.ITicketBoxService;
 import com.qs.mp.admin.service.ITicketPackageService;
 import com.qs.mp.admin.service.ITicketService;
+import com.qs.mp.common.enums.MqTopicType;
 import com.qs.mp.common.enums.TicketBoxStatusEnum;
 import com.qs.mp.common.enums.TicketPkgStatusEnum;
 import com.qs.mp.common.enums.TicketPrizeTypeEnum;
 import com.qs.mp.common.enums.TicketStatusEnum;
 import com.qs.mp.common.enums.TicketTypeEnum;
+import com.qs.mp.common.pulsar.PulsarClientService;
 import com.qs.mp.common.utils.LogUtil;
 import com.qs.mp.system.service.id.BizIdGenerator;
 import java.math.BigDecimal;
@@ -40,6 +44,7 @@ import java.util.Map;
 import java.util.Random;
 import lombok.Data;
 import ma.glasnost.orika.MapperFacade;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -88,6 +93,9 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
   @Autowired
   private ITicketPackageService ticketPackageService;
 
+  @Autowired
+  private PulsarClientService pulsarClientService;
+
   @Override
   @Transactional
   public boolean createTicketBox(TicketBoxCreateParam param) {
@@ -96,7 +104,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
       ticketBox.setPkgQty(ticketBox.getQuantity() / ticketBox.getPkgUnit());
     }
-    ticketBox.setStatus(TicketBoxStatusEnum.PUT_OFF);
+    ticketBox.setStatus(TicketBoxStatusEnum.WAIT);
     ticketBox.setBoxNo(ticketBoxSerialService.generateSerial(ticketBox.getType()));
     ticketBox.setBoxId(bizIdGenerator.newId());
     save(ticketBox);
@@ -140,62 +148,29 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     ticketAwardsService.saveBatch(ticketAwardsList);
     ticketAwardsPrizeService.saveBatch(awardsPrizeList);
 
+    try {
+      pulsarClientService.producer(MqTopicType.ticket_generate, ticketBox.getBoxId());
+    } catch (PulsarClientException e) {
+      LogUtil.error(logger, e, "盲票组保存成功,发送异步消息失败. {0}", JSONObject.toJSONString(ticketBox));
+    }
+    return true;
+  }
+
+  @Override
+  @Transactional
+  public void generateTicket(String boxId) {
+    TicketBox ticketBox = getById(boxId);
+    Assert.isTrue(ticketBox.getStatus() == TicketBoxStatusEnum.DOING,
+        "盲票生成时,票组状态不是出票中,boxId=" + boxId);
+    List<TicketAwards> ticketAwardsList = ticketAwardsService.list(
+        new LambdaQueryWrapper<TicketAwards>()
+            .eq(TicketAwards::getBoxId, ticketBox.getBoxId()));
     // 分包
     int pkgNum = ticketBox.getPkgQty();
     int pkgUnit = ticketBox.getPkgUnit();
 
-    // 1.把各奖级的中奖数量分摊到每包上
-    Map<Integer, List<PkgAwards>> pkgAwardsMap = new HashMap<>();
-    for (int m = 1; m <= pkgNum; m++) {
-      pkgAwardsMap.put(m, new ArrayList<PkgAwards>());
-    }
-    List<Integer> excludePkgList = new ArrayList<>();
-    for (int k = 0; k < ticketAwardsList.size(); k++) {
-      TicketAwards ticketAwards = ticketAwardsList.get(k);
-      if (ticketAwards.getQuantity() < pkgNum) {
-        // 奖级数量少于包数的,随机不重复分配,随机数从1开始
-        int totalNone = pkgNum - ticketAwards.getQuantity(); // 轮空数
-        int moreExInt = excludePkgList.size() - totalNone; // 本轮要排除的数 - 轮空数
-        if (moreExInt > 0) {
-          // 多出来的数,从末尾开始删除
-          for (int l = 0; l < moreExInt; l++) {
-            excludePkgList.remove(excludePkgList.size() - 1);
-          }
-        }
-        List<Integer> randomList = getRandomList(excludePkgList, ticketAwards.getQuantity(), pkgNum);
-        excludePkgList.addAll(randomList);
-        LogUtil.debug(logger, "随机分配到的包序号为:{0}", new Object[]{JSONObject.toJSONString(randomList)});
-        for (Integer pkgId : randomList) {
-          pkgAwardsMap.get(pkgId).add(
-              new PkgAwards(ticketAwards.getAwardsId(), ticketAwards.getName(),
-                  ticketAwards.getSort(), 1));
-        }
-      } else {
-        // 奖级数量大于包数的,平均分配,四舍五入
-        int remainQty = ticketAwards.getQuantity();
-        for (int i = 1; i <= pkgNum; i++) {
-          int quantity;
-          if (k == ticketAwardsList.size() - 1) {
-            // 最后一个奖项直接分配奖级数量差额即可
-            int hasQty = pkgAwardsMap.get(i).stream().mapToInt(PkgAwards::getQuantity).sum();
-            quantity = pkgUnit - hasQty;
-          } else {
-            if (i == pkgNum) {
-              quantity = remainQty;
-            } else {
-              quantity = new BigDecimal(remainQty).divide(new BigDecimal(pkgNum - i + 1), 0,
-                  RoundingMode.HALF_UP).intValue();
-            }
-          }
-          remainQty -= quantity;
-          Assert.isTrue(remainQty >= 0, "剩余奖级数量不足分配。boxId:" + ticketBox.getBoxId());
-          pkgAwardsMap.get(i).add(
-              new PkgAwards(ticketAwards.getAwardsId(), ticketAwards.getName(),
-                  ticketAwards.getSort(), quantity));
-        }
-      }
-      LogUtil.debug(logger, "奖级{0}分包结果:{1}", new Object[]{k, JSONObject.toJSONString(pkgAwardsMap)});
-    }
+    Map<Integer, List<PkgAwards>> pkgAwardsMap = generatePkgAwards(
+        ticketBox, ticketAwardsList, pkgNum, pkgUnit);
 
     // 生成票包记录和盲票记录
     int pkgCnt = 0;
@@ -215,7 +190,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
       ticketPackageList.add(ticketPackage);
 
       List<PkgAwards> pkgAwardsList = pkgAwardsMap.get(key);
-      LogUtil.debug(logger, "第{0}包盲票奖项数量为{1}", new Object[]{pkgCnt, pkgAwardsList});
+      LogUtil.debug(logger, "第{0}包盲票奖项数量为{1}", pkgCnt, pkgAwardsList);
       List<Ticket> ticketList = new ArrayList<>();
       for (int j = 1; j <= pkgUnit; j++) {
         ticketCnt += 1;
@@ -226,7 +201,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
             iterator.remove();
           }
         }
-        LogUtil.debug(logger, "开始生成第{0}包、第{1}盲票", new Object[]{pkgCnt, j});
+        LogUtil.debug(logger, "开始生成第{0}包、第{1}盲票", pkgCnt, j);
         int random = getPrizeIndex(pkgAwardsList);
         PkgAwards pkgAwards = pkgAwardsList.get(random);
         pkgAwards.setQuantity(pkgAwards.getQuantity() - 1);
@@ -248,7 +223,8 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
             continue;
           }
           int awardsNum = new Random().nextInt(ticketAwardsList.size());
-          drawNumDTOList.add(new TicketDrawNumDTO(ticketAwardsList.get(awardsNum).getName(), drawNum));
+          drawNumDTOList.add(
+              new TicketDrawNumDTO(ticketAwardsList.get(awardsNum).getName(), drawNum));
         }
         ticket.setDrawNum(JSONObject.toJSONString(drawNumDTOList));
         ticket.setIsPhysical(1);
@@ -258,7 +234,69 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
       ticketService.saveBatch(ticketList);
     }
     ticketPackageService.saveBatch(ticketPackageList);
-    return true;
+
+    boolean rst = update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DONE)
+        .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.DOING));
+    Assert.isTrue(rst, "盲票生成完,更新盲票组状态失败。boxId:{0}" + ticketBox.getBoxId());
+  }
+
+  private Map<Integer, List<PkgAwards>> generatePkgAwards(TicketBox ticketBox,
+      List<TicketAwards> ticketAwardsList, int pkgNum, int pkgUnit) {
+    // 1.把各奖级的中奖数量分摊到每包上
+    Map<Integer, List<PkgAwards>> pkgAwardsMap = new HashMap<>();
+    for (int m = 1; m <= pkgNum; m++) {
+      pkgAwardsMap.put(m, new ArrayList<PkgAwards>());
+    }
+    List<Integer> excludePkgList = new ArrayList<>();
+    for (int k = 0; k < ticketAwardsList.size(); k++) {
+      TicketAwards ticketAwards = ticketAwardsList.get(k);
+      if (ticketAwards.getQuantity() < pkgNum) {
+        // 奖级数量少于包数的,随机不重复分配,随机数从1开始
+        int totalNone = pkgNum - ticketAwards.getQuantity(); // 轮空数
+        int moreExInt = excludePkgList.size() - totalNone; // 本轮要排除的数 - 轮空数
+        if (moreExInt > 0) {
+          // 多出来的数,从末尾开始删除
+          for (int l = 0; l < moreExInt; l++) {
+            excludePkgList.remove(excludePkgList.size() - 1);
+          }
+        }
+        List<Integer> randomList = getRandomList(excludePkgList, ticketAwards.getQuantity(),
+            pkgNum);
+        excludePkgList.addAll(randomList);
+        LogUtil.debug(logger, "随机分配到的包序号为:{0}", JSONObject.toJSONString(randomList));
+        for (Integer pkgId : randomList) {
+          pkgAwardsMap.get(pkgId).add(
+              new PkgAwards(ticketAwards.getAwardsId(), ticketAwards.getName(),
+                  ticketAwards.getSort(), 1));
+        }
+      } else {
+        // 奖级数量大于包数的,平均分配,四舍五入
+        int remainQty = ticketAwards.getQuantity();
+        for (int i = 1; i <= pkgNum; i++) {
+          int quantity;
+          if (k == ticketAwardsList.size() - 1) {
+            // 最后一个奖项直接分配奖级数量差额即可
+            int hasQty = pkgAwardsMap.get(i).stream().mapToInt(PkgAwards::getQuantity).sum();
+            quantity = pkgUnit - hasQty;
+          } else {
+            if (i == pkgNum) {
+              quantity = remainQty;
+            } else {
+              quantity = new BigDecimal(remainQty).divide(new BigDecimal(pkgNum - i + 1), 0,
+                  RoundingMode.HALF_UP).intValue();
+            }
+          }
+          remainQty -= quantity;
+          Assert.isTrue(remainQty >= 0, "剩余奖级数量不足分配。boxId:" + ticketBox.getBoxId());
+          pkgAwardsMap.get(i).add(
+              new PkgAwards(ticketAwards.getAwardsId(), ticketAwards.getName(),
+                  ticketAwards.getSort(), quantity));
+        }
+      }
+      LogUtil.debug(logger, "奖级{0}分包结果:{1}",
+          k, JSONObject.toJSONString(pkgAwardsMap));
+    }
+    return pkgAwardsMap;
   }
 
   /**
@@ -291,13 +329,14 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
       } else {
         d1 += Double.parseDouble(String.valueOf(prizes.get(i - 1).getQuantity())) / sumWeight;
       }
-      LogUtil.debug(logger, "prize:{0},区间 d1:{1}, d2:{2}", new Object[]{JSONObject.toJSONString(prizes.get(i)), d1, d2} );
+      LogUtil.debug(logger, "prize:{0},区间 d1:{1}, d2:{2}",
+          JSONObject.toJSONString(prizes.get(i)), d1, d2);
       if (randomNumber > d1 && randomNumber <= d2) {
         random = i;
         break;
       }
     }
-    LogUtil.debug(logger, "抽中序号:{0}", new Object[]{random});
+    LogUtil.debug(logger, "抽中序号:{0}", random);
     return random;
   }
 
@@ -313,7 +352,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     Random rand = new Random();
     boolean[] bool = new boolean[total];
     for (Integer exInt : excludeList) {
-      bool[exInt-1] = true;
+      bool[exInt - 1] = true;
     }
     int randInt = 0;
     for (int i = 0; i < n; i++) {

+ 54 - 0
mp-service/src/main/java/com/qs/mp/mq/impl/PulsarConsumerImpl.java

@@ -0,0 +1,54 @@
+package com.qs.mp.mq.impl;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.qs.mp.admin.domain.TicketAwards;
+import com.qs.mp.admin.domain.TicketBox;
+import com.qs.mp.admin.service.ITicketBoxSerialService;
+import com.qs.mp.admin.service.ITicketBoxService;
+import com.qs.mp.common.enums.MqTopicType;
+import com.qs.mp.common.enums.TicketBoxStatusEnum;
+import com.qs.mp.common.pulsar.PulsarConsumer;
+import com.qs.mp.common.utils.LogUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+import org.springframework.util.Assert;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/20 10:23 上午
+ * @describe
+ */
+@Service
+public class PulsarConsumerImpl implements PulsarConsumer {
+
+  protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
+
+  @Autowired
+  private ITicketBoxService ticketBoxService;
+
+  @Override
+  @Async("threadPoolTaskExecutor")
+  public void wsConsumer(String topicType, String mqData) {
+    logger.info(topicType + "  >>>>>>>>>>>>>>>>>>>:" + mqData);
+    //监听商户充值消息,后续业务处理
+    if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
+      String boxId = mqData;
+      TicketBox ticketBox = ticketBoxService.getById(boxId);
+      if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
+        LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
+        return;
+      }
+      // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
+      boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
+          .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
+      if (!rst) {
+        LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
+        return;
+      }
+      ticketBoxService.generateTicket(boxId);
+    }
+  }
+}

+ 7 - 1
pom.xml

@@ -36,6 +36,7 @@
         <jwt.version>0.9.1</jwt.version>
         <hutool.version>5.3.1</hutool.version>
         <mybatisplus.version>3.4.3.2</mybatisplus.version>
+        <pulsar.version>2.7.1</pulsar.version>
     </properties>
 
     <!-- 依赖声明 -->
@@ -74,7 +75,12 @@
 			    <version>3.4.1</version>
 			</dependency>
 
-
+            <!-- MQ消息中间件-->
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client</artifactId>
+                <version>${pulsar.version}</version>
+            </dependency>
 
             <!-- 阿里数据库连接池 -->
             <dependency>