123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package com.qs.mp.mq.impl;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.qs.mp.admin.domain.Ticket;
- import com.qs.mp.admin.domain.TicketAwards;
- import com.qs.mp.admin.domain.TicketBox;
- import com.qs.mp.admin.domain.TicketPackage;
- import com.qs.mp.admin.service.ITicketBoxSerialService;
- import com.qs.mp.admin.service.ITicketBoxService;
- import com.qs.mp.admin.service.ITicketPackageService;
- import com.qs.mp.admin.service.ITicketService;
- import com.qs.mp.common.enums.CommStatusEnum;
- import com.qs.mp.common.enums.MqTopicType;
- import com.qs.mp.common.enums.TicketBoxStatusEnum;
- import com.qs.mp.common.enums.TicketPkgSaleStatusEnum;
- import com.qs.mp.common.enums.TicketTypeEnum;
- import com.qs.mp.common.enums.UserTicketOrderStatusEnum;
- import com.qs.mp.common.pulsar.PulsarConsumer;
- import com.qs.mp.common.utils.LogUtil;
- import com.qs.mp.user.domain.UserTicketOrder;
- import com.qs.mp.user.domain.UserTicketOrderItem;
- import com.qs.mp.user.service.IUserTicketOrderItemService;
- import com.qs.mp.user.service.IUserTicketOrderService;
- import java.util.List;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import org.springframework.util.Assert;
- /**
- * @auther duota
- * @create 2021 2021/9/20 10:23 上午
- * @describe
- */
- @Service
- public class PulsarConsumerImpl implements PulsarConsumer {
- protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
- @Autowired
- private ITicketBoxService ticketBoxService;
- @Autowired
- private IUserTicketOrderService userTicketOrderService;
- @Autowired
- private IUserTicketOrderItemService userTicketOrderItemService;
- @Autowired
- private ITicketPackageService ticketPackageService;
- @Autowired
- private ITicketService ticketService;
- @Override
- @Async("threadPoolTaskExecutor")
- public void wsConsumer(String topicType, String mqData) {
- logger.info(topicType + " >>>>>>>>>>>>>>>>>>>:" + mqData);
- //监听盲票生成消息,后续业务处理
- if (MqTopicType.ticket_generate.getValue().equals(topicType)) {
- processTicketGenerateMsg(mqData);
- } else if (MqTopicType.ticket_pay.getValue().equals(topicType)) {
- processTicketPayMsg(mqData);
- }
- }
- private void processTicketPayMsg(String mqData) {
- String orderId = mqData;
- UserTicketOrder ticketOrder = userTicketOrderService.getById(orderId);
- TicketBox ticketBox = ticketBoxService.getById(ticketOrder.getBoxId());
- if (ticketBox.getType() == TicketTypeEnum.OFFLINE) {
- // 线下票更新销量,此处不做乐观锁控制,因为不用控制库存
- ticketBoxService.update(
- new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getSaleQty,
- ticketBox.getSaleQty() + 1)
- .eq(TicketBox::getBoxId, ticketBox.getBoxId()));
- }
- // 更新票组销售数量,此处只做累计,允许并发容错
- List<UserTicketOrderItem> ticketOrderItemList = userTicketOrderItemService.list(new LambdaQueryWrapper<UserTicketOrderItem>()
- .eq(UserTicketOrderItem::getOrderId, orderId));
- for (UserTicketOrderItem orderItem : ticketOrderItemList) {
- Ticket ticket = ticketService.getById(orderItem.getTicketId());
- TicketPackage ticketPackage = ticketPackageService.getById(ticket.getPkgId());
- ticketPackageService.update(new LambdaUpdateWrapper<TicketPackage>()
- .set(TicketPkgSaleStatusEnum.WAIT_SALE == ticketPackage.getSaleStatus(),
- TicketPackage::getSaleStatus, TicketPkgSaleStatusEnum.ON_SALE)
- .set(TicketPkgSaleStatusEnum.ON_SALE == ticketPackage.getSaleStatus()
- && ticketPackage.getSaleQty() + 1 >= ticketPackage.getPkgUnit(),
- TicketPackage::getSaleStatus, TicketPkgSaleStatusEnum.SALE_OUT)
- .set(TicketPackage::getSaleQty, ticketPackage.getSaleQty() + 1)
- .eq(TicketPackage::getPkgId, ticketPackage.getPkgId()));
- }
- if (ticketOrder.getCommStatus() != CommStatusEnum.NO) {
- LogUtil.warn(logger, "收到盲票支付成功消息,订单结佣状态不是未结佣,忽略消息。orderId=" + orderId);
- return;
- }
- // 先更新状态,防并发。
- boolean rst = userTicketOrderService.update(new LambdaUpdateWrapper<UserTicketOrder>().set(UserTicketOrder::getCommStatus, CommStatusEnum.DOING)
- .eq(UserTicketOrder::getOrderId, orderId).eq(UserTicketOrder::getCommStatus, CommStatusEnum.NO));
- if (!rst) {
- LogUtil.error(logger, "收到盲票支付成功消息,更新订单结佣状态为结佣中失败。orderId=" + orderId);
- return;
- }
- userTicketOrderService.commToChannel(orderId);
- }
- private void processTicketGenerateMsg(String mqData) {
- String boxId = mqData;
- TicketBox ticketBox = ticketBoxService.getById(boxId);
- if (ticketBox.getStatus() != TicketBoxStatusEnum.WAIT ) {
- LogUtil.error(logger, "收到盲票生成消息,票组状态不是待出票,忽略消息。boxId=" + boxId);
- return;
- }
- // 先更新状态,防并发。如果最后生成失败了,暂时人工重新触发
- boolean rst = ticketBoxService.update(new LambdaUpdateWrapper<TicketBox>().set(TicketBox::getStatus, TicketBoxStatusEnum.DOING)
- .eq(TicketBox::getBoxId, boxId).eq(TicketBox::getStatus, TicketBoxStatusEnum.WAIT));
- if (!rst) {
- LogUtil.error(logger, "收到盲票生成消息,更新票组状态为出票中失败。boxId=" + boxId);
- return;
- }
- ticketBoxService.generateTicket(boxId);
- }
- }
|