package com.qs.mp.common.pulsar; import com.qs.mp.common.enums.MqTopicType; import com.qs.mp.common.utils.LogUtil; import com.qs.mp.common.utils.StringUtils; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @auther duota * @create 2021 2021/9/19 2:27 下午 * @describe */ @Component public class PulsarClientService { protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName()); private PulsarClient client; private Consumer consumer; @Value("${mq.service-url}") private String serviceUrl; @Value("${mq.auth-token}") private String authToken; @Value("${mq.topic-ticket-generate}") private String topicTicketGenerate; @Value("${mq.topic-ticket-pay}") private String topicTicketPay; @Value("${mq.consumer-conn}") private boolean consumerConn; @Value("${mq.consumer-topics}") private String topics; @Autowired private PulsarConsumer pulsarConsumer; private final ConcurrentHashMap> Producers = new ConcurrentHashMap>(); @PostConstruct public void init() throws PulsarClientException { client = PulsarClient.builder() .serviceUrl(serviceUrl)//【集群管理】接入地址处复制 .authentication(AuthenticationFactory.token(authToken)) .build(); if (consumerConn) { consumer = client.newConsumer() .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称 .subscriptionName("mp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名 .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式 .subscriptionInitialPosition( SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息 .subscribe(); new Thread(() -> { try { loop(); } catch (Exception e) { logger.error("消费Pulsar数据异常,停止Pulsar连接:", e); close(); } }).start(); } } private void loop() throws Exception { //消费消息 while (true) { Message message = consumer.receive(); String[] keyArr = message.getKey().split("_"); String topicName = message.getTopicName(); String key = message.getKey(); String jsons = new String(message.getData()); if (!StringUtils.isBlank(jsons)) { try { pulsarConsumer.wsConsumer(key, jsons); logger.info("receive data>>>>>>" + jsons); consumer.acknowledge(message); } catch (Exception e) { LogUtil.error(logger, e, "消费Pulsar数据异常,key【{0}】,json【{1}】:", new Object[]{message.getKey(), jsons}); } } } } /** * @param mqTopicType * @param data 内容为json格式 * @throws PulsarClientException */ public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException { logger.info("start producer mq data:" + data); String topic = ""; if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) { topic = topicTicketGenerate; // 盲票生成 } else if (mqTopicType.getValue() == MqTopicType.ticket_pay.getValue()) { topic = topicTicketPay; // 盲票付款成功 } Producer producer = null; if (Producers.containsKey(mqTopicType.getValue())) { producer = Producers.get(mqTopicType.getValue()); } else { producer = client.newProducer() .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称 .enableBatching(true) .create(); Producers.put(mqTopicType.getValue(), producer); } /* producer.newMessage()//发送消息 .key(mqTopicType.getValue()) .value(data.getBytes()) .send(); */ CompletableFuture messageIdFuture = producer.newMessage() .key(mqTopicType.getValue()) .value(data.getBytes()) .sendAsync(); Producer finalProducer = producer; messageIdFuture.whenComplete(((messageId, throwable) -> { if (null != throwable) { logger.error("【消息投递异常,开始重试】mq data:" + data, throwable); //todo 失败重试策略 try { finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS ).key(mqTopicType.getValue()).value(data.getBytes()).send(); } catch (PulsarClientException e) { logger.error("重试投递失败", e); //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性 } } else { logger.info("【消息成功投递】"); } })); } public void consumer() throws PulsarClientException { } public void close() { try { consumer.close(); } catch (PulsarClientException e) { logger.error("关闭Pulsar消费者失败:", e); } try { client.close(); } catch (PulsarClientException e) { logger.error("关闭Pulsar连接失败:", e); } } }