PulsarConsumerImpl.java 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package com.qs.mp.mq.impl;
  2. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  3. import com.qs.mp.admin.domain.TicketAwards;
  4. import com.qs.mp.admin.domain.TicketBox;
  5. import com.qs.mp.admin.service.ITicketBoxSerialService;
  6. import com.qs.mp.admin.service.ITicketBoxService;
  7. import com.qs.mp.common.enums.CommStatusEnum;
  8. import com.qs.mp.common.enums.MqTopicType;
  9. import com.qs.mp.common.enums.TicketBoxStatusEnum;
  10. import com.qs.mp.common.enums.TicketTypeEnum;
  11. import com.qs.mp.common.enums.UserTicketOrderStatusEnum;
  12. import com.qs.mp.common.pulsar.PulsarConsumer;
  13. import com.qs.mp.common.utils.LogUtil;
  14. import com.qs.mp.user.domain.UserTicketOrder;
  15. import com.qs.mp.user.service.IUserTicketOrderService;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.scheduling.annotation.Async;
  20. import org.springframework.stereotype.Service;
  21. import org.springframework.util.Assert;
  22. /**
  23. * @auther duota
  24. * @create 2021 2021/9/20 10:23 上午
  25. * @describe
  26. */
  27. @Service
  28. public class PulsarConsumerImpl implements PulsarConsumer {
  29. protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
  30. @Autowired
  31. private ITicketBoxService ticketBoxService;
  32. @Autowired
  33. private IUserTicketOrderService userTicketOrderService;
  34. @Override
  35. @Async("threadPoolTaskExecutor")
  36. public void wsConsumer(String topicType, String mqData) {
  37. logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData);
  38. //监听盲票生成消息,后续业务处理
  39. if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
  40. processTicketGenerateMsg(mqData);
  41. } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) {
  42. processTicketPayMsg(mqData);
  43. }
  44. }
  45. private void processTicketPayMsg(String mqData) {
  46. String orderId = mqData;
  47. UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
  48. TicketBox ticketBox = ticketBoxService.getById(ticketOrder.getBoxId());
  49. if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
  50. // 线下票更新销量,此处不做乐观锁控制,因为不用控制库存
  51. ticketBoxService.update(
  52. new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getSaleQty,
  53. ticketBox.getSaleQty() + 1)
  54. .eq(TicketBox::getBoxId, ticketBox.getBoxId()));
  55. }
  56. if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
  57. LogUtil.warn(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId);
  58. return;
  59. }
  60. // 先更新状态,防并发。
  61. boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
  62. .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
  63. if (!rst) {
  64. LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId);
  65. return;
  66. }
  67. userTicketOrderService.commToChannel(orderId);
  68. }
  69. private void processTicketGenerateMsg(String mqData) {
  70. String boxId = mqData;
  71. TicketBox ticketBox = ticketBoxService.getById(boxId);
  72. if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
  73. LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
  74. return;
  75. }
  76. // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
  77. boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
  78. .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
  79. if (!rst) {
  80. LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
  81. return;
  82. }
  83. ticketBoxService.generateTicket(boxId);
  84. }
  85. }