ソースを参照

增加MQ消息

chunping 3 年 前
コミット
b1646e6f5e

+ 7 - 0
mp-admin/src/main/resources/application-dev.yml

@@ -106,6 +106,13 @@ cloud:
     # 存储对象公开不可访问,需要通过应用下载
     private-bucket-name: mp-auth-test-1307117429
 
+mq:
+    consumer-conn: false
+    service-url: http://pulsar-9nbo99zrnxzb.tdmq-pulsar.ap-sh.qcloud.tencenttdmq.com:5039
+    auth-token: eyJrZXlJZCI6InB1bHNhci05bmJvOTl6cm54emIiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItOW5ibzk5enJueHpiX3lncC1zZXJ2ZXIifQ.RO0faVqgSfcc6RkS3Tctv2sAPe2Wp26lVZK34BtdhBU
+    topic-ticket-generate: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+    consumer-topics: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+
 # 小程序
 miniprogram:
     # 接收通知的状态

+ 8 - 0
mp-admin/src/main/resources/application-test.yml

@@ -111,6 +111,14 @@ cloud:
   # 存储对象公开不可访问,需要通过应用下载
   private-bucket-name: mp-auth-test-1307117429
 
+mq:
+  consumer-conn: true
+  service-url: http://pulsar-9nbo99zrnxzb.tdmq-pulsar.ap-sh.qcloud.tencenttdmq.com:5039
+  auth-token: eyJrZXlJZCI6InB1bHNhci05bmJvOTl6cm54emIiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItOW5ibzk5enJueHpiX3lncC1zZXJ2ZXIifQ.RO0faVqgSfcc6RkS3Tctv2sAPe2Wp26lVZK34BtdhBU
+  topic-ticket-generate: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+  consumer-topics: pulsar-rkrxw2wx8zeo/test-share/topic-ticket-generate
+
+
 # 小程序
 miniprogram:
   # 接收通知的状态

+ 4 - 0
mp-common/pom.xml

@@ -59,6 +59,10 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+        </dependency>
         <!-- JSON工具类 -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>

+ 27 - 0
mp-common/src/main/java/com/qs/mp/common/enums/MqTopicType.java

@@ -0,0 +1,27 @@
+package com.qs.mp.common.enums;
+
+import com.baomidou.mybatisplus.annotation.IEnum;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/19 5:28 下午
+ * @describe
+ */
+public enum MqTopicType implements IEnum<String> {
+
+  ticket_generate("1", "盲票生成");
+
+
+  private final String value;
+  private final String desc;
+
+  MqTopicType(final String value, final String desc) {
+    this.value = value;
+    this.desc = desc;
+  }
+
+  @Override
+  public String getValue() {
+    return value;
+  }
+}

+ 4 - 0
mp-common/src/main/java/com/qs/mp/common/enums/TicketBoxStatusEnum.java

@@ -10,6 +10,10 @@ import com.baomidou.mybatisplus.annotation.IEnum;
  */
 public enum TicketBoxStatusEnum implements IEnum<String> {
 
+
+  WAIT("wait", "待出票"),
+  DOING("doing", "出票中"),
+  DONE("done", "待上架"),
   PUT_ON("on", "上架"),
   PUT_OFF("off", "下架");
 

+ 171 - 0
mp-common/src/main/java/com/qs/mp/common/pulsar/PulsarClientService.java

@@ -0,0 +1,171 @@
+package com.qs.mp.common.pulsar;
+
+import com.qs.mp.common.enums.MqTopicType;
+import com.qs.mp.common.utils.StringUtils;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PostConstruct;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/19 2:27 下午
+ * @describe
+ */
+@Component
+public class PulsarClientService {
+
+  protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
+  private PulsarClient client;
+  private Consumer<byte[]> consumer;
+
+  @Value("${mq.service-url}")
+  private String serviceUrl;
+
+  @Value("${mq.auth-token}")
+  private String authToken;
+
+  @Value("${mq.topic-ticket-generate}")
+  private String topicTicketGenerate;
+
+  @Value("${mq.consumer-conn}")
+  private boolean consumerConn;
+
+  @Value("${mq.consumer-topics}")
+  private String topics;
+
+  @Autowired
+  private PulsarConsumer pulsarConsumer;
+
+  private final ConcurrentHashMap<String, Producer<byte[]>> Producers = new ConcurrentHashMap<String, Producer<byte[]>>();
+
+  @PostConstruct
+  public void init() throws PulsarClientException {
+    client = PulsarClient.builder()
+        .serviceUrl(serviceUrl)//【集群管理】接入地址处复制
+        .authentication(AuthenticationFactory.token(authToken))
+        .build();
+
+    if (consumerConn) {
+      consumer = client.newConsumer()
+          .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
+          .subscriptionName("ygp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
+          .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
+          .subscriptionInitialPosition(
+              SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
+          .subscribe();
+      new Thread(() -> {
+        try {
+          loop();
+        } catch (Exception e) {
+          logger.error("消费Pulsar数据异常,停止Pulsar连接:", e);
+          close();
+        }
+      }).start();
+    }
+  }
+
+  private void loop() throws Exception {
+    //消费消息
+    while (true) {
+      Message message = consumer.receive();
+      String[] keyArr = message.getKey().split("_");
+      String topicName = message.getTopicName();
+      String key = message.getKey();
+      String jsons = new String(message.getData());
+      if (!StringUtils.isBlank(jsons)) {
+        try {
+          pulsarConsumer.wsConsumer(key, jsons);
+          logger.info("receive data>>>>>>" + jsons);
+        } catch (Exception e) {
+          logger.error("消费Pulsar数据异常,key【{}】,json【{}】:", message.getKey(), jsons, e);
+        }
+      }
+      consumer.acknowledge(message);
+    }
+  }
+
+  /**
+   * @param mqTopicType
+   * @param data        内容为json格式
+   * @throws PulsarClientException
+   */
+  public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException {
+    logger.info("start producer mq data:" + data);
+    String topic = "";
+    if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) {
+      topic = topicTicketGenerate; //
+    }
+    Producer<byte[]> producer = null;
+    if (Producers.containsKey(mqTopicType.getValue())) {
+      producer = Producers.get(mqTopicType.getValue());
+    } else {
+      producer = client.newProducer()
+          .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
+          .enableBatching(true)
+          .create();
+      Producers.put(mqTopicType.getValue(), producer);
+    }
+
+    /*
+    producer.newMessage()//发送消息
+        .key(mqTopicType.getValue())
+        .value(data.getBytes())
+            .send();
+
+
+     */
+    CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
+        .key(mqTopicType.getValue())
+        .value(data.getBytes())
+        .sendAsync();
+    Producer<byte[]> finalProducer = producer;
+    messageIdFuture.whenComplete(((messageId, throwable) -> {
+      if (null != throwable) {
+        logger.error("【消息投递异常,开始重试】", throwable);
+        //todo 失败重试策略
+        try {
+          finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS
+          ).key(mqTopicType.getValue()).value(data.getBytes()).send();
+        } catch (PulsarClientException e) {
+          logger.error("重试投递失败", e);
+          //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性
+        }
+      } else {
+        logger.info("【消息成功投递】");
+      }
+    }));
+
+  }
+
+  public void consumer() throws PulsarClientException {
+
+  }
+
+  public void close() {
+    try {
+      consumer.close();
+    } catch (PulsarClientException e) {
+      logger.error("关闭Pulsar消费者失败:", e);
+    }
+    try {
+      client.close();
+    } catch (PulsarClientException e) {
+      logger.error("关闭Pulsar连接失败:", e);
+    }
+  }
+}

+ 12 - 0
mp-common/src/main/java/com/qs/mp/common/pulsar/PulsarConsumer.java

@@ -0,0 +1,12 @@
+package com.qs.mp.common.pulsar;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/20 10:19 上午
+ * @describe
+ */
+public interface PulsarConsumer {
+
+  public void wsConsumer(String topicType,String mqData);
+
+}

+ 6 - 0
mp-service/src/main/java/com/qs/mp/admin/service/ITicketBoxService.java

@@ -20,4 +20,10 @@ public interface ITicketBoxService extends IService<TicketBox> {
    * @return
    */
   boolean createTicketBox(TicketBoxCreateParam param);
+
+  /**
+   * 生成盲票
+   * @param boxId
+   */
+  void generateTicket(String boxId);
 }

+ 40 - 12
mp-service/src/main/java/com/qs/mp/admin/service/impl/TicketBoxServiceImpl.java

@@ -1,6 +1,8 @@
 package com.qs.mp.admin.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.qs.mp.admin.domain.Coupon;
 import com.qs.mp.admin.domain.Goods;
@@ -22,11 +24,13 @@ import com.qs.mp.admin.service.ITicketBoxSerialService;
 import com.qs.mp.admin.service.ITicketBoxService;
 import com.qs.mp.admin.service.ITicketPackageService;
 import com.qs.mp.admin.service.ITicketService;
+import com.qs.mp.common.enums.MqTopicType;
 import com.qs.mp.common.enums.TicketBoxStatusEnum;
 import com.qs.mp.common.enums.TicketPkgStatusEnum;
 import com.qs.mp.common.enums.TicketPrizeTypeEnum;
 import com.qs.mp.common.enums.TicketStatusEnum;
 import com.qs.mp.common.enums.TicketTypeEnum;
+import com.qs.mp.common.pulsar.PulsarClientService;
 import com.qs.mp.common.utils.LogUtil;
 import com.qs.mp.system.service.id.BizIdGenerator;
 import java.math.BigDecimal;
@@ -40,6 +44,7 @@ import java.util.Map;
 import java.util.Random;
 import lombok.Data;
 import ma.glasnost.orika.MapperFacade;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -88,6 +93,9 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
   @Autowired
   private ITicketPackageService ticketPackageService;
 
+  @Autowired
+  private PulsarClientService pulsarClientService;
+
   @Override
   @Transactional
   public boolean createTicketBox(TicketBoxCreateParam param) {
@@ -96,7 +104,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
       ticketBox.setPkgQty(ticketBox.getQuantity() / ticketBox.getPkgUnit());
     }
-    ticketBox.setStatus(TicketBoxStatusEnum.PUT_OFF);
+    ticketBox.setStatus(TicketBoxStatusEnum.WAIT);
     ticketBox.setBoxNo(ticketBoxSerialService.generateSerial(ticketBox.getType()));
     ticketBox.setBoxId(bizIdGenerator.newId());
     save(ticketBox);
@@ -140,11 +148,23 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     ticketAwardsService.saveBatch(ticketAwardsList);
     ticketAwardsPrizeService.saveBatch(awardsPrizeList);
 
-    generateTicket(ticketBox, ticketAwardsList);
+    try {
+      pulsarClientService.producer(MqTopicType.ticket_generate, ticketBox.getBoxId());
+    } catch (PulsarClientException e) {
+      LogUtil.error(logger, e, "盲票组保存成功,发送异步消息失败. {0}", JSONObject.toJSONString(ticketBox));
+    }
     return true;
   }
 
-  private void generateTicket(TicketBox ticketBox, List<TicketAwards> ticketAwardsList) {
+  @Override
+  @Transactional
+  public void generateTicket(String boxId) {
+    TicketBox ticketBox = getById(boxId);
+    Assert.isTrue(ticketBox.getStatus() == TicketBoxStatusEnum.DOING,
+        "盲票生成时,票组状态不是出票中,boxId=" + boxId);
+    List<TicketAwards> ticketAwardsList = ticketAwardsService.list(
+        new LambdaQueryWrapper<TicketAwards>()
+            .eq(TicketAwards::getBoxId, ticketBox.getBoxId()));
     // 分包
     int pkgNum = ticketBox.getPkgQty();
     int pkgUnit = ticketBox.getPkgUnit();
@@ -170,7 +190,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
       ticketPackageList.add(ticketPackage);
 
       List<PkgAwards> pkgAwardsList = pkgAwardsMap.get(key);
-      LogUtil.debug(logger, "第{0}包盲票奖项数量为{1}", new Object[]{pkgCnt, pkgAwardsList});
+      LogUtil.debug(logger, "第{0}包盲票奖项数量为{1}", pkgCnt, pkgAwardsList);
       List<Ticket> ticketList = new ArrayList<>();
       for (int j = 1; j <= pkgUnit; j++) {
         ticketCnt += 1;
@@ -181,7 +201,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
             iterator.remove();
           }
         }
-        LogUtil.debug(logger, "开始生成第{0}包、第{1}盲票", new Object[]{pkgCnt, j});
+        LogUtil.debug(logger, "开始生成第{0}包、第{1}盲票", pkgCnt, j);
         int random = getPrizeIndex(pkgAwardsList);
         PkgAwards pkgAwards = pkgAwardsList.get(random);
         pkgAwards.setQuantity(pkgAwards.getQuantity() - 1);
@@ -203,7 +223,8 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
             continue;
           }
           int awardsNum = new Random().nextInt(ticketAwardsList.size());
-          drawNumDTOList.add(new TicketDrawNumDTO(ticketAwardsList.get(awardsNum).getName(), drawNum));
+          drawNumDTOList.add(
+              new TicketDrawNumDTO(ticketAwardsList.get(awardsNum).getName(), drawNum));
         }
         ticket.setDrawNum(JSONObject.toJSONString(drawNumDTOList));
         ticket.setIsPhysical(1);
@@ -213,6 +234,10 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
       ticketService.saveBatch(ticketList);
     }
     ticketPackageService.saveBatch(ticketPackageList);
+
+    boolean rst = update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DONE)
+        .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.DOING));
+    Assert.isTrue(rst, "盲票生成完,更新盲票组状态失败。boxId:{0}" + ticketBox.getBoxId());
   }
 
   private Map<Integer, List<PkgAwards>> generatePkgAwards(TicketBox ticketBox,
@@ -235,9 +260,10 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
             excludePkgList.remove(excludePkgList.size() - 1);
           }
         }
-        List<Integer> randomList = getRandomList(excludePkgList, ticketAwards.getQuantity(), pkgNum);
+        List<Integer> randomList = getRandomList(excludePkgList, ticketAwards.getQuantity(),
+            pkgNum);
         excludePkgList.addAll(randomList);
-        LogUtil.debug(logger, "随机分配到的包序号为:{0}", new Object[]{JSONObject.toJSONString(randomList)});
+        LogUtil.debug(logger, "随机分配到的包序号为:{0}", JSONObject.toJSONString(randomList));
         for (Integer pkgId : randomList) {
           pkgAwardsMap.get(pkgId).add(
               new PkgAwards(ticketAwards.getAwardsId(), ticketAwards.getName(),
@@ -267,7 +293,8 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
                   ticketAwards.getSort(), quantity));
         }
       }
-      LogUtil.debug(logger, "奖级{0}分包结果:{1}", new Object[]{k, JSONObject.toJSONString(pkgAwardsMap)});
+      LogUtil.debug(logger, "奖级{0}分包结果:{1}",
+          k, JSONObject.toJSONString(pkgAwardsMap));
     }
     return pkgAwardsMap;
   }
@@ -302,13 +329,14 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
       } else {
         d1 += Double.parseDouble(String.valueOf(prizes.get(i - 1).getQuantity())) / sumWeight;
       }
-      LogUtil.debug(logger, "prize:{0},区间 d1:{1}, d2:{2}", new Object[]{JSONObject.toJSONString(prizes.get(i)), d1, d2} );
+      LogUtil.debug(logger, "prize:{0},区间 d1:{1}, d2:{2}",
+          JSONObject.toJSONString(prizes.get(i)), d1, d2);
       if (randomNumber > d1 && randomNumber <= d2) {
         random = i;
         break;
       }
     }
-    LogUtil.debug(logger, "抽中序号:{0}", new Object[]{random});
+    LogUtil.debug(logger, "抽中序号:{0}", random);
     return random;
   }
 
@@ -324,7 +352,7 @@ public class TicketBoxServiceImpl extends ServiceImpl<TicketBoxMapper, TicketBox
     Random rand = new Random();
     boolean[] bool = new boolean[total];
     for (Integer exInt : excludeList) {
-      bool[exInt-1] = true;
+      bool[exInt - 1] = true;
     }
     int randInt = 0;
     for (int i = 0; i < n; i++) {

+ 54 - 0
mp-service/src/main/java/com/qs/mp/mq/impl/PulsarConsumerImpl.java

@@ -0,0 +1,54 @@
+package com.qs.mp.mq.impl;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.qs.mp.admin.domain.TicketAwards;
+import com.qs.mp.admin.domain.TicketBox;
+import com.qs.mp.admin.service.ITicketBoxSerialService;
+import com.qs.mp.admin.service.ITicketBoxService;
+import com.qs.mp.common.enums.MqTopicType;
+import com.qs.mp.common.enums.TicketBoxStatusEnum;
+import com.qs.mp.common.pulsar.PulsarConsumer;
+import com.qs.mp.common.utils.LogUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+import org.springframework.util.Assert;
+
+/**
+ * @auther duota
+ * @create 2021 2021/9/20 10:23 上午
+ * @describe
+ */
+@Service
+public class PulsarConsumerImpl implements PulsarConsumer {
+
+  protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
+
+  @Autowired
+  private ITicketBoxService ticketBoxService;
+
+  @Override
+  @Async("threadPoolTaskExecutor")
+  public void wsConsumer(String topicType, String mqData) {
+    logger.info(topicType + "  >>>>>>>>>>>>>>>>>>>:" + mqData);
+    //监听商户充值消息,后续业务处理
+    if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
+      String boxId = mqData;
+      TicketBox ticketBox = ticketBoxService.getById(boxId);
+      if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
+        LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
+        return;
+      }
+      // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
+      boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
+          .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
+      if (!rst) {
+        LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
+        return;
+      }
+      ticketBoxService.generateTicket(boxId);
+    }
+  }
+}

+ 7 - 1
pom.xml

@@ -36,6 +36,7 @@
         <jwt.version>0.9.1</jwt.version>
         <hutool.version>5.3.1</hutool.version>
         <mybatisplus.version>3.4.3.2</mybatisplus.version>
+        <pulsar.version>2.7.1</pulsar.version>
     </properties>
 
     <!-- 依赖声明 -->
@@ -74,7 +75,12 @@
 			    <version>3.4.1</version>
 			</dependency>
 
-
+            <!-- MQ消息中间件-->
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client</artifactId>
+                <version>${pulsar.version}</version>
+            </dependency>
 
             <!-- 阿里数据库连接池 -->
             <dependency>