PulsarConsumerImpl.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package com.qs.mp.mq.impl;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  4. import com.qs.mp.admin.domain.Ticket;
  5. import com.qs.mp.admin.domain.TicketAwards;
  6. import com.qs.mp.admin.domain.TicketBox;
  7. import com.qs.mp.admin.domain.TicketPackage;
  8. import com.qs.mp.admin.service.ITicketBoxSerialService;
  9. import com.qs.mp.admin.service.ITicketBoxService;
  10. import com.qs.mp.admin.service.ITicketPackageService;
  11. import com.qs.mp.admin.service.ITicketService;
  12. import com.qs.mp.common.enums.CommStatusEnum;
  13. import com.qs.mp.common.enums.MqTopicType;
  14. import com.qs.mp.common.enums.TicketBoxStatusEnum;
  15. import com.qs.mp.common.enums.TicketPkgSaleStatusEnum;
  16. import com.qs.mp.common.enums.TicketTypeEnum;
  17. import com.qs.mp.common.enums.UserTicketOrderStatusEnum;
  18. import com.qs.mp.common.pulsar.PulsarConsumer;
  19. import com.qs.mp.common.utils.LogUtil;
  20. import com.qs.mp.user.domain.UserTicketOrder;
  21. import com.qs.mp.user.domain.UserTicketOrderItem;
  22. import com.qs.mp.user.service.IUserTicketOrderItemService;
  23. import com.qs.mp.user.service.IUserTicketOrderService;
  24. import java.util.List;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.scheduling.annotation.Async;
  29. import org.springframework.stereotype.Service;
  30. import org.springframework.util.Assert;
  31. /**
  32. * @auther duota
  33. * @create 2021 2021/9/20 10:23 上午
  34. * @describe
  35. */
  36. @Service
  37. public class PulsarConsumerImpl implements PulsarConsumer {
  38. protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
  39. @Autowired
  40. private ITicketBoxService ticketBoxService;
  41. @Autowired
  42. private IUserTicketOrderService userTicketOrderService;
  43. @Autowired
  44. private IUserTicketOrderItemService userTicketOrderItemService;
  45. @Autowired
  46. private ITicketPackageService ticketPackageService;
  47. @Autowired
  48. private ITicketService ticketService;
  49. @Override
  50. @Async("threadPoolTaskExecutor")
  51. public void wsConsumer(String topicType, String mqData) {
  52. logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData);
  53. //监听盲票生成消息,后续业务处理
  54. if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
  55. processTicketGenerateMsg(mqData);
  56. } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) {
  57. processTicketPayMsg(mqData);
  58. }
  59. }
  60. private void processTicketPayMsg(String mqData) {
  61. String orderId = mqData;
  62. UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
  63. TicketBox ticketBox = ticketBoxService.getById(ticketOrder.getBoxId());
  64. if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
  65. // 线下票更新销量,此处不做乐观锁控制,因为不用控制库存
  66. ticketBoxService.update(
  67. new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getSaleQty,
  68. ticketBox.getSaleQty() + 1)
  69. .eq(TicketBox::getBoxId, ticketBox.getBoxId()));
  70. }
  71. // 更新票组销售数量,此处只做累计,允许并发容错
  72. List<UserTicketOrderItem> ticketOrderItemList = userTicketOrderItemService.list(new LambdaQueryWrapper<UserTicketOrderItem>()
  73. .eq(UserTicketOrderItem::getOrderId, orderId));
  74. for (UserTicketOrderItem orderItem : ticketOrderItemList) {
  75. Ticket ticket = ticketService.getById(orderItem.getTicketId());
  76. TicketPackage ticketPackage = ticketPackageService.getById(ticket.getPkgId());
  77. ticketPackageService.update(new LambdaUpdateWrapper<TicketPackage>()
  78. .set(TicketPkgSaleStatusEnum.WAIT_SALE == ticketPackage.getSaleStatus(),
  79. TicketPackage::getSaleStatus, TicketPkgSaleStatusEnum.ON_SALE)
  80. .set(TicketPkgSaleStatusEnum.ON_SALE == ticketPackage.getSaleStatus()
  81. && ticketPackage.getSaleQty() + 1 >= ticketPackage.getPkgUnit(),
  82. TicketPackage::getSaleStatus, TicketPkgSaleStatusEnum.SALE_OUT)
  83. .set(TicketPackage::getSaleQty, ticketPackage.getSaleQty() + 1)
  84. .eq(TicketPackage::getPkgId, ticketPackage.getPkgId()));
  85. }
  86. if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
  87. LogUtil.warn(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId);
  88. return;
  89. }
  90. // 先更新状态,防并发。
  91. boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
  92. .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
  93. if (!rst) {
  94. LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId);
  95. return;
  96. }
  97. userTicketOrderService.commToChannel(orderId);
  98. }
  99. private void processTicketGenerateMsg(String mqData) {
  100. String boxId = mqData;
  101. TicketBox ticketBox = ticketBoxService.getById(boxId);
  102. if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
  103. LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
  104. return;
  105. }
  106. // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
  107. boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
  108. .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
  109. if (!rst) {
  110. LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
  111. return;
  112. }
  113. ticketBoxService.generateTicket(boxId);
  114. }
  115. }