123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package com.qs.mp.mq.impl;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.qs.mp.admin.domain.TicketAwards;
- import com.qs.mp.admin.domain.TicketBox;
- import com.qs.mp.admin.service.ITicketBoxSerialService;
- import com.qs.mp.admin.service.ITicketBoxService;
- import com.qs.mp.common.enums.CommStatusEnum;
- import com.qs.mp.common.enums.MqTopicType;
- import com.qs.mp.common.enums.TicketBoxStatusEnum;
- import com.qs.mp.common.enums.UserTicketOrderStatusEnum;
- import com.qs.mp.common.pulsar.PulsarConsumer;
- import com.qs.mp.common.utils.LogUtil;
- import com.qs.mp.user.domain.UserTicketOrder;
- import com.qs.mp.user.service.IUserTicketOrderService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import org.springframework.util.Assert;
- /**
- * @auther duota
- * @create 2021 2021/9/20 10:23 上午
- * @describe
- */
- @Service
- public class PulsarConsumerImpl implements PulsarConsumer {
- protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
- @Autowired
- private ITicketBoxService ticketBoxService;
- @Autowired
- private IUserTicketOrderService userTicketOrderService;
- @Override
- @Async("threadPoolTaskExecutor")
- public void wsConsumer(String topicType, String mqData) {
- logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData);
- //监听盲票生成消息,后续业务处理
- if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
- processTicketGenerateMsg(mqData);
- } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) {
- processTicketPayMsg(mqData);
- }
- }
- private void processTicketPayMsg(String mqData) {
- String orderId = mqData;
- UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
- if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
- LogUtil.error(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId);
- return;
- }
- // 先更新状态,防并发。
- boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
- .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
- if (!rst) {
- LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId);
- return;
- }
- userTicketOrderService.commToChannel(orderId);
- }
- private void processTicketGenerateMsg(String mqData) {
- String boxId = mqData;
- TicketBox ticketBox = ticketBoxService.getById(boxId);
- if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
- LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
- return;
- }
- // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
- boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
- .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
- if (!rst) {
- LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
- return;
- }
- ticketBoxService.generateTicket(boxId);
- }
- }
|