|
@@ -34,54 +34,54 @@ public class PulsarClientService {
|
|
|
private PulsarClient client;
|
|
|
private Consumer<byte[]> consumer;
|
|
|
|
|
|
- @Value("${mq.service-url}")
|
|
|
- private String serviceUrl;
|
|
|
+// @Value("${mq.service-url}")
|
|
|
+// private String serviceUrl;
|
|
|
|
|
|
- @Value("${mq.auth-token}")
|
|
|
- private String authToken;
|
|
|
+// @Value("${mq.auth-token}")
|
|
|
+// private String authToken;
|
|
|
|
|
|
- @Value("${mq.topic-ticket-generate}")
|
|
|
- private String topicTicketGenerate;
|
|
|
+// @Value("${mq.topic-ticket-generate}")
|
|
|
+// private String topicTicketGenerate;
|
|
|
|
|
|
- @Value("${mq.topic-ticket-pay}")
|
|
|
- private String topicTicketPay;
|
|
|
+// @Value("${mq.topic-ticket-pay}")
|
|
|
+// private String topicTicketPay;
|
|
|
|
|
|
- @Value("${mq.consumer-conn}")
|
|
|
- private boolean consumerConn;
|
|
|
+// @Value("${mq.consumer-conn}")
|
|
|
+// private boolean consumerConn;
|
|
|
|
|
|
- @Value("${mq.consumer-topics}")
|
|
|
- private String topics;
|
|
|
+// @Value("${mq.consumer-topics}")
|
|
|
+// private String topics;
|
|
|
|
|
|
@Autowired
|
|
|
private PulsarConsumer pulsarConsumer;
|
|
|
|
|
|
private final ConcurrentHashMap<String, Producer<byte[]>> Producers = new ConcurrentHashMap<String, Producer<byte[]>>();
|
|
|
|
|
|
- @PostConstruct
|
|
|
- public void init() throws PulsarClientException {
|
|
|
- client = PulsarClient.builder()
|
|
|
- .serviceUrl(serviceUrl)//【集群管理】接入地址处复制
|
|
|
- .authentication(AuthenticationFactory.token(authToken))
|
|
|
- .build();
|
|
|
-
|
|
|
- if (consumerConn) {
|
|
|
- consumer = client.newConsumer()
|
|
|
- .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
|
|
|
- .subscriptionName("mp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
|
|
|
- .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
|
|
|
- .subscriptionInitialPosition(
|
|
|
- SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
|
|
|
- .subscribe();
|
|
|
- new Thread(() -> {
|
|
|
- try {
|
|
|
- loop();
|
|
|
- } catch (Exception e) {
|
|
|
- logger.error("消费Pulsar数据异常,停止Pulsar连接:", e);
|
|
|
- close();
|
|
|
- }
|
|
|
- }).start();
|
|
|
- }
|
|
|
- }
|
|
|
+// @PostConstruct
|
|
|
+// public void init() throws PulsarClientException {
|
|
|
+// client = PulsarClient.builder()
|
|
|
+// .serviceUrl(serviceUrl)//【集群管理】接入地址处复制
|
|
|
+// .authentication(AuthenticationFactory.token(authToken))
|
|
|
+// .build();
|
|
|
+
|
|
|
+// if (consumerConn) {
|
|
|
+// consumer = client.newConsumer()
|
|
|
+// .topic(topics.split(","))//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
|
|
|
+// .subscriptionName("mp")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
|
|
|
+// .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
|
|
|
+// .subscriptionInitialPosition(
|
|
|
+// SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
|
|
|
+// .subscribe();
|
|
|
+// new Thread(() -> {
|
|
|
+// try {
|
|
|
+// loop();
|
|
|
+// } catch (Exception e) {
|
|
|
+// logger.error("消费Pulsar数据异常,停止Pulsar连接:", e);
|
|
|
+// close();
|
|
|
+// }
|
|
|
+// }).start();
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
private void loop() throws Exception {
|
|
|
//消费消息
|
|
@@ -104,60 +104,60 @@ public class PulsarClientService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @param mqTopicType
|
|
|
- * @param data 内容为json格式
|
|
|
- * @throws PulsarClientException
|
|
|
- */
|
|
|
- public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException {
|
|
|
- logger.info("start producer mq data:" + data);
|
|
|
- String topic = "";
|
|
|
- if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) {
|
|
|
- topic = topicTicketGenerate; // 盲票生成
|
|
|
- } else if (mqTopicType.getValue() == MqTopicType.ticket_pay.getValue()) {
|
|
|
- topic = topicTicketPay; // 盲票付款成功
|
|
|
- }
|
|
|
- Producer<byte[]> producer = null;
|
|
|
- if (Producers.containsKey(mqTopicType.getValue())) {
|
|
|
- producer = Producers.get(mqTopicType.getValue());
|
|
|
- } else {
|
|
|
- producer = client.newProducer()
|
|
|
- .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
|
|
|
- .enableBatching(true)
|
|
|
- .create();
|
|
|
- Producers.put(mqTopicType.getValue(), producer);
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- producer.newMessage()//发送消息
|
|
|
- .key(mqTopicType.getValue())
|
|
|
- .value(data.getBytes())
|
|
|
- .send();
|
|
|
-
|
|
|
-
|
|
|
- */
|
|
|
- CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
|
|
|
- .key(mqTopicType.getValue())
|
|
|
- .value(data.getBytes())
|
|
|
- .sendAsync();
|
|
|
- Producer<byte[]> finalProducer = producer;
|
|
|
- messageIdFuture.whenComplete(((messageId, throwable) -> {
|
|
|
- if (null != throwable) {
|
|
|
- logger.error("【消息投递异常,开始重试】mq data:" + data, throwable);
|
|
|
- //todo 失败重试策略
|
|
|
- try {
|
|
|
- finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS
|
|
|
- ).key(mqTopicType.getValue()).value(data.getBytes()).send();
|
|
|
- } catch (PulsarClientException e) {
|
|
|
- logger.error("重试投递失败", e);
|
|
|
- //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性
|
|
|
- }
|
|
|
- } else {
|
|
|
- logger.info("【消息成功投递】");
|
|
|
- }
|
|
|
- }));
|
|
|
-
|
|
|
- }
|
|
|
+// /**
|
|
|
+// * @param mqTopicType
|
|
|
+// * @param data 内容为json格式
|
|
|
+// * @throws PulsarClientException
|
|
|
+// */
|
|
|
+// public void producer(MqTopicType mqTopicType, String data) throws PulsarClientException {
|
|
|
+// logger.info("start producer mq data:" + data);
|
|
|
+// String topic = "";
|
|
|
+// if (mqTopicType.getValue() == MqTopicType.ticket_generate.getValue()) {
|
|
|
+// topic = topicTicketGenerate; // 盲票生成
|
|
|
+// } else if (mqTopicType.getValue() == MqTopicType.ticket_pay.getValue()) {
|
|
|
+// topic = topicTicketPay; // 盲票付款成功
|
|
|
+// }
|
|
|
+// Producer<byte[]> producer = null;
|
|
|
+// if (Producers.containsKey(mqTopicType.getValue())) {
|
|
|
+// producer = Producers.get(mqTopicType.getValue());
|
|
|
+// } else {
|
|
|
+// producer = client.newProducer()
|
|
|
+// .topic(topic)//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
|
|
|
+// .enableBatching(true)
|
|
|
+// .create();
|
|
|
+// Producers.put(mqTopicType.getValue(), producer);
|
|
|
+// }
|
|
|
+//
|
|
|
+// /*
|
|
|
+// producer.newMessage()//发送消息
|
|
|
+// .key(mqTopicType.getValue())
|
|
|
+// .value(data.getBytes())
|
|
|
+// .send();
|
|
|
+//
|
|
|
+//
|
|
|
+// */
|
|
|
+// CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
|
|
|
+// .key(mqTopicType.getValue())
|
|
|
+// .value(data.getBytes())
|
|
|
+// .sendAsync();
|
|
|
+// Producer<byte[]> finalProducer = producer;
|
|
|
+// messageIdFuture.whenComplete(((messageId, throwable) -> {
|
|
|
+// if (null != throwable) {
|
|
|
+// logger.error("【消息投递异常,开始重试】mq data:" + data, throwable);
|
|
|
+// //todo 失败重试策略
|
|
|
+// try {
|
|
|
+// finalProducer.newMessage().deliverAfter(5, TimeUnit.SECONDS
|
|
|
+// ).key(mqTopicType.getValue()).value(data.getBytes()).send();
|
|
|
+// } catch (PulsarClientException e) {
|
|
|
+// logger.error("重试投递失败", e);
|
|
|
+// //todo 保存数据库,增加失败重试,通过体系化重试框架保证消息投递完整性
|
|
|
+// }
|
|
|
+// } else {
|
|
|
+// logger.info("【消息成功投递】");
|
|
|
+// }
|
|
|
+// }));
|
|
|
+//
|
|
|
+// }
|
|
|
|
|
|
public void consumer() throws PulsarClientException {
|
|
|
|