package com.qs.mp.mq.impl; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.qs.mp.admin.domain.TicketAwards; import com.qs.mp.admin.domain.TicketBox; import com.qs.mp.admin.service.ITicketBoxSerialService; import com.qs.mp.admin.service.ITicketBoxService; import com.qs.mp.common.enums.CommStatusEnum; import com.qs.mp.common.enums.MqTopicType; import com.qs.mp.common.enums.TicketBoxStatusEnum; import com.qs.mp.common.enums.UserTicketOrderStatusEnum; import com.qs.mp.common.pulsar.PulsarConsumer; import com.qs.mp.common.utils.LogUtil; import com.qs.mp.user.domain.UserTicketOrder; import com.qs.mp.user.service.IUserTicketOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.util.Assert; /** * @auther duota * @create 2021 2021/9/20 10:23 上午 * @describe */ @Service public class PulsarConsumerImpl implements PulsarConsumer { protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName()); @Autowired private ITicketBoxService ticketBoxService; @Autowired private IUserTicketOrderService userTicketOrderService; @Override @Async("threadPoolTaskExecutor") public void wsConsumer(String topicType, String mqData) { logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData); //监听盲票生成消息,后续业务处理 if (MqTopicType.ticket_generate.getValue().equals(topicType)) { processTicketGenerateMsg(mqData); } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) { processTicketPayMsg(mqData); } } private void processTicketPayMsg(String mqData) { String orderId = mqData; UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId); if (ticketOrder.getCommStatus() != CommStatusEnum.NO) { LogUtil.warn(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId); return; } // 先更新状态,防并发。 boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING) .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO)); if (!rst) { LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId); return; } userTicketOrderService.commToChannel(orderId); } private void processTicketGenerateMsg(String mqData) { String boxId = mqData; TicketBox ticketBox = ticketBoxService.getById(boxId); if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) { LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId); return; } // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发 boolean rst = ticketBoxService.update(new LambdaUpdateWrapper().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING) .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT)); if (!rst) { LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId); return; } ticketBoxService.generateTicket(boxId); } }