123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- package com.qs.mp.common.pulsar;
- import com.qs.mp.common.enums.MqTopicType;
- import com.qs.mp.common.utils.LogUtil;
- import com.qs.mp.common.utils.StringUtils;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.TimeUnit;
- import javax.annotation.PostConstruct;
- import org.apache.pulsar.client.api.AuthenticationFactory;
- import org.apache.pulsar.client.api.Consumer;
- import org.apache.pulsar.client.api.Message;
- import org.apache.pulsar.client.api.MessageId;
- import org.apache.pulsar.client.api.Producer;
- import org.apache.pulsar.client.api.PulsarClient;
- import org.apache.pulsar.client.api.PulsarClientException;
- import org.apache.pulsar.client.api.SubscriptionInitialPosition;
- import org.apache.pulsar.client.api.SubscriptionType;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- /**
- * @auther duota
- * @create 2021 2021/9/19 2:27 下午
- * @describe
- */
- @Component
- public class PulsarClientService {
- protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
- private PulsarClient client;
- private Consumer<byte[]> consumer;
- @Value("${mq.service-url}")
- private String serviceUrl;
- @Value("${mq.auth-token}")
- private String authToken;
- @Value("${mq.topic-ticket-generate}")
- private String topicTicketGenerate;
- @Value("${mq.topic-ticket-pay}")
- private String topicTicketPay;
- @Value("${mq.consumer-conn}")
- private boolean consumerConn;
- @Value("${mq.consumer-topics}")
- private String topics;
- @Autowired
- private PulsarConsumer pulsarConsumer;
- private final ConcurrentHashMap<String, Producer<byte[]>> Producers = new ConcurrentHashMap<String, Producer<byte[]>>();
- @PostConstruct
- public void init() throws PulsarClientException {
- client = PulsarClient.builder()
- .serviceUrl(serviceUrl)//【集群管理】接入地址处复制
- .authentication(AuthenticationFactory.token(authToken))
- .build();
- if (consumerConn) {
- consumer = client.newConsumer()
- .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
- .subscriptionName("mp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
- .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
- .subscriptionInitialPosition(
- SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
- .subscribe();
- new Thread(() -> {
- try {
- loop();
- } catch (Exception e) {
- logger.error("消费Pulsar数据异常,停止Pulsar连接:", e);
- close();
- }
- }).start();
- }
- }
- private void loop() throws Exception {
- //消费消息
- while (true) {
- Message message = consumer.receive();
- String[] keyArr = message.getKey().split("_");
- String topicName = message.getTopicName();
- String key = message.getKey();
- String jsons = new String(message.getData());
- if (!StringUtils.isBlank(jsons)) {
- try {
- pulsarConsumer.wsConsumer(key, jsons);
- logger.info("receive data>>>>>>" + jsons);
- consumer.acknowledge(message);
- } catch (Exception e) {
- LogUtil.error(logger, e, "消费Pulsar数据异常,key【{0}】,json【{1}】:", new Object[]{message.getKey(), jsons});
- }
- }
- }
- }
- /**
- * @param mqTopicType
- * @param data 内容为json格式
- * @throws PulsarClientException
- */
- public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException {
- logger.info("start producer mq data:" + data);
- String topic = "";
- if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) {
- topic = topicTicketGenerate; // 盲票生成
- } else if (mqTopicType.getValue() == MqTopicType.ticket_pay.getValue()) {
- topic = topicTicketPay; // 盲票付款成功
- }
- Producer<byte[]> producer = null;
- if (Producers.containsKey(mqTopicType.getValue())) {
- producer = Producers.get(mqTopicType.getValue());
- } else {
- producer = client.newProducer()
- .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
- .enableBatching(true)
- .create();
- Producers.put(mqTopicType.getValue(), producer);
- }
- /*
- producer.newMessage()//发送消息
- .key(mqTopicType.getValue())
- .value(data.getBytes())
- .send();
- */
- CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
- .key(mqTopicType.getValue())
- .value(data.getBytes())
- .sendAsync();
- Producer<byte[]> finalProducer = producer;
- messageIdFuture.whenComplete(((messageId, throwable) -> {
- if (null != throwable) {
- logger.error("【消息投递异常,开始重试】mq data:" + data, throwable);
- //todo 失败重试策略
- try {
- finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS
- ).key(mqTopicType.getValue()).value(data.getBytes()).send();
- } catch (PulsarClientException e) {
- logger.error("重试投递失败", e);
- //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性
- }
- } else {
- logger.info("【消息成功投递】");
- }
- }));
- }
- public void consumer() throws PulsarClientException {
- }
- public void close() {
- try {
- consumer.close();
- } catch (PulsarClientException e) {
- logger.error("关闭Pulsar消费者失败:", e);
- }
- try {
- client.close();
- } catch (PulsarClientException e) {
- logger.error("关闭Pulsar连接失败:", e);
- }
- }
- }
|