1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package com.qs.mp.mq.impl;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.qs.mp.admin.domain.TicketAwards;
- import com.qs.mp.admin.domain.TicketBox;
- import com.qs.mp.admin.service.ITicketBoxSerialService;
- import com.qs.mp.admin.service.ITicketBoxService;
- import com.qs.mp.common.enums.CommStatusEnum;
- import com.qs.mp.common.enums.MqTopicType;
- import com.qs.mp.common.enums.TicketBoxStatusEnum;
- import com.qs.mp.common.enums.TicketTypeEnum;
- import com.qs.mp.common.enums.UserTicketOrderStatusEnum;
- import com.qs.mp.common.pulsar.PulsarConsumer;
- import com.qs.mp.common.utils.LogUtil;
- import com.qs.mp.user.domain.UserTicketOrder;
- import com.qs.mp.user.service.IUserTicketOrderService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import org.springframework.util.Assert;
- /**
- * @auther duota
- * @create 2021 2021/9/20 10:23 上午
- * @describe
- */
- @Service
- public class PulsarConsumerImpl implements PulsarConsumer {
- protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
- @Autowired
- private ITicketBoxService ticketBoxService;
- @Autowired
- private IUserTicketOrderService userTicketOrderService;
- @Override
- @Async("threadPoolTaskExecutor")
- public void wsConsumer(String topicType, String mqData) {
- logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData);
- //监听盲票生成消息,后续业务处理
- if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
- processTicketGenerateMsg(mqData);
- } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) {
- processTicketPayMsg(mqData);
- }
- }
- private void processTicketPayMsg(String mqData) {
- String orderId = mqData;
- UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
- TicketBox ticketBox = ticketBoxService.getById(ticketOrder.getBoxId());
- if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
- // 线下票更新销量,此处不做乐观锁控制,因为不用控制库存
- ticketBoxService.update(
- new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getSaleQty,
- ticketBox.getSaleQty() + 1)
- .eq(TicketBox::getBoxId, ticketBox.getBoxId()));
- }
- if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
- LogUtil.warn(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId);
- return;
- }
- // 先更新状态,防并发。
- boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
- .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
- if (!rst) {
- LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId);
- return;
- }
- userTicketOrderService.commToChannel(orderId);
- }
- private void processTicketGenerateMsg(String mqData) {
- String boxId = mqData;
- TicketBox ticketBox = ticketBoxService.getById(boxId);
- if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
- LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
- return;
- }
- // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
- boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
- .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
- if (!rst) {
- LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
- return;
- }
- ticketBoxService.generateTicket(boxId);
- }
- }
|