PulsarConsumerImpl.java 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package com.qs.mp.mq.impl;
  2. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  3. import com.qs.mp.admin.domain.TicketAwards;
  4. import com.qs.mp.admin.domain.TicketBox;
  5. import com.qs.mp.admin.service.ITicketBoxSerialService;
  6. import com.qs.mp.admin.service.ITicketBoxService;
  7. import com.qs.mp.common.enums.CommStatusEnum;
  8. import com.qs.mp.common.enums.MqTopicType;
  9. import com.qs.mp.common.enums.TicketBoxStatusEnum;
  10. import com.qs.mp.common.enums.UserTicketOrderStatusEnum;
  11. import com.qs.mp.common.pulsar.PulsarConsumer;
  12. import com.qs.mp.common.utils.LogUtil;
  13. import com.qs.mp.user.domain.UserTicketOrder;
  14. import com.qs.mp.user.service.IUserTicketOrderService;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.scheduling.annotation.Async;
  19. import org.springframework.stereotype.Service;
  20. import org.springframework.util.Assert;
  21. /**
  22. * @auther duota
  23. * @create 2021 2021/9/20 10:23 上午
  24. * @describe
  25. */
  26. @Service
  27. public class PulsarConsumerImpl implements PulsarConsumer {
  28. protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
  29. @Autowired
  30. private ITicketBoxService ticketBoxService;
  31. @Autowired
  32. private IUserTicketOrderService userTicketOrderService;
  33. @Override
  34. @Async("threadPoolTaskExecutor")
  35. public void wsConsumer(String topicType, String mqData) {
  36. logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData);
  37. //监听盲票生成消息,后续业务处理
  38. if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
  39. processTicketGenerateMsg(mqData);
  40. } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) {
  41. processTicketPayMsg(mqData);
  42. }
  43. }
  44. private void processTicketPayMsg(String mqData) {
  45. String orderId = mqData;
  46. UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
  47. if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
  48. LogUtil.warn(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId);
  49. return;
  50. }
  51. // 先更新状态,防并发。
  52. boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
  53. .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
  54. if (!rst) {
  55. LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId);
  56. return;
  57. }
  58. userTicketOrderService.commToChannel(orderId);
  59. }
  60. private void processTicketGenerateMsg(String mqData) {
  61. String boxId = mqData;
  62. TicketBox ticketBox = ticketBoxService.getById(boxId);
  63. if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
  64. LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
  65. return;
  66. }
  67. // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
  68. boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
  69. .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
  70. if (!rst) {
  71. LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
  72. return;
  73. }
  74. ticketBoxService.generateTicket(boxId);
  75. }
  76. }