PulsarClientService.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package com.qs.mp.common.pulsar;
  2. import com.qs.mp.common.enums.MqTopicType;
  3. import com.qs.mp.common.utils.LogUtil;
  4. import com.qs.mp.common.utils.StringUtils;
  5. import java.util.concurrent.CompletableFuture;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.util.concurrent.TimeUnit;
  8. import javax.annotation.PostConstruct;
  9. import org.apache.pulsar.client.api.AuthenticationFactory;
  10. import org.apache.pulsar.client.api.Consumer;
  11. import org.apache.pulsar.client.api.Message;
  12. import org.apache.pulsar.client.api.MessageId;
  13. import org.apache.pulsar.client.api.Producer;
  14. import org.apache.pulsar.client.api.PulsarClient;
  15. import org.apache.pulsar.client.api.PulsarClientException;
  16. import org.apache.pulsar.client.api.SubscriptionInitialPosition;
  17. import org.apache.pulsar.client.api.SubscriptionType;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.beans.factory.annotation.Value;
  22. import org.springframework.stereotype.Component;
  23. /**
  24. * @auther duota
  25. * @create 2021 2021/9/19 2:27 下午
  26. * @describe
  27. */
  28. @Component
  29. public class PulsarClientService {
  30. protected final Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
  31. private PulsarClient client;
  32. private Consumer<byte[]> consumer;
  33. @Value("${mq.service-url}")
  34. private String serviceUrl;
  35. @Value("${mq.auth-token}")
  36. private String authToken;
  37. @Value("${mq.topic-ticket-generate}")
  38. private String topicTicketGenerate;
  39. @Value("${mq.topic-ticket-pay}")
  40. private String topicTicketPay;
  41. @Value("${mq.consumer-conn}")
  42. private boolean consumerConn;
  43. @Value("${mq.consumer-topics}")
  44. private String topics;
  45. @Autowired
  46. private PulsarConsumer pulsarConsumer;
  47. private final ConcurrentHashMap<String, Producer<byte[]>> Producers = new ConcurrentHashMap<String, Producer<byte[]>>();
  48. @PostConstruct
  49. public void init() throws PulsarClientException {
  50. client = PulsarClient.builder()
  51. .serviceUrl(serviceUrl)//【集群管理】接入地址处复制
  52. .authentication(AuthenticationFactory.token(authToken))
  53. .build();
  54. if (consumerConn) {
  55. consumer = client.newConsumer()
  56. .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
  57. .subscriptionName("mp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
  58. .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
  59. .subscriptionInitialPosition(
  60. SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
  61. .subscribe();
  62. new Thread(() -> {
  63. try {
  64. loop();
  65. } catch (Exception e) {
  66. logger.error("消费Pulsar数据异常,停止Pulsar连接:", e);
  67. close();
  68. }
  69. }).start();
  70. }
  71. }
  72. private void loop() throws Exception {
  73. //消费消息
  74. while (true) {
  75. Message message = consumer.receive();
  76. String[] keyArr = message.getKey().split("_");
  77. String topicName = message.getTopicName();
  78. String key = message.getKey();
  79. String jsons = new String(message.getData());
  80. if (!StringUtils.isBlank(jsons)) {
  81. try {
  82. pulsarConsumer.wsConsumer(key, jsons);
  83. logger.info("receive data>>>>>>" + jsons);
  84. consumer.acknowledge(message);
  85. } catch (Exception e) {
  86. LogUtil.error(logger, e, "消费Pulsar数据异常,key【{0}】,json【{1}】:", new Object[]{message.getKey(), jsons});
  87. }
  88. }
  89. }
  90. }
  91. /**
  92. * @param mqTopicType
  93. * @param data 内容为json格式
  94. * @throws PulsarClientException
  95. */
  96. public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException {
  97. logger.info("start producer mq data:" + data);
  98. String topic = "";
  99. if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) {
  100. topic = topicTicketGenerate; // 盲票生成
  101. } else if (mqTopicType.getValue() == MqTopicType.ticket_pay.getValue()) {
  102. topic = topicTicketPay; // 盲票付款成功
  103. }
  104. Producer<byte[]> producer = null;
  105. if (Producers.containsKey(mqTopicType.getValue())) {
  106. producer = Producers.get(mqTopicType.getValue());
  107. } else {
  108. producer = client.newProducer()
  109. .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
  110. .enableBatching(true)
  111. .create();
  112. Producers.put(mqTopicType.getValue(), producer);
  113. }
  114. /*
  115. producer.newMessage()//发送消息
  116. .key(mqTopicType.getValue())
  117. .value(data.getBytes())
  118. .send();
  119. */
  120. CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
  121. .key(mqTopicType.getValue())
  122. .value(data.getBytes())
  123. .sendAsync();
  124. Producer<byte[]> finalProducer = producer;
  125. messageIdFuture.whenComplete(((messageId, throwable) -> {
  126. if (null != throwable) {
  127. logger.error("【消息投递异常,开始重试】mq data:" + data, throwable);
  128. //todo 失败重试策略
  129. try {
  130. finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS
  131. ).key(mqTopicType.getValue()).value(data.getBytes()).send();
  132. } catch (PulsarClientException e) {
  133. logger.error("重试投递失败", e);
  134. //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性
  135. }
  136. } else {
  137. logger.info("【消息成功投递】");
  138. }
  139. }));
  140. }
  141. public void consumer() throws PulsarClientException {
  142. }
  143. public void close() {
  144. try {
  145. consumer.close();
  146. } catch (PulsarClientException e) {
  147. logger.error("关闭Pulsar消费者失败:", e);
  148. }
  149. try {
  150. client.close();
  151. } catch (PulsarClientException e) {
  152. logger.error("关闭Pulsar连接失败:", e);
  153. }
  154. }
  155. }