From f6b6e23866a1c2474fa64123ce4556117ae34fe9 Mon Sep 17 00:00:00 2001 From: xzcawl Date: Fri, 15 Jul 2022 17:36:28 +0800 Subject: [PATCH] add rabbitMq send --- austin-support/pom.xml | 6 +++ .../constans/MessageQueuePipeline.java | 1 + .../mq/rabbit/RabbitSendMqServiceImpl.java | 45 +++++++++++++++++++ .../src/main/resources/application.properties | 19 ++++++++ pom.xml | 8 ++++ 5 files changed, 79 insertions(+) create mode 100644 austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 96ae2e03a..afc9ab801 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -93,6 +93,12 @@ com.aliyun alibaba-dingtalk-service-sdk + + + org.springframework.amqp + spring-rabbit + + \ No newline at end of file diff --git a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java index 3cf93fd0b..e31b81eb4 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java +++ b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java @@ -10,5 +10,6 @@ public interface MessageQueuePipeline { String EVENT_BUS = "eventBus"; String KAFKA = "kafka"; String ROCKET_MQ = "rocketMq"; + String RABBIT_MQ = "rabbitMq"; } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java new file mode 100644 index 000000000..661f4b726 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rabbit/RabbitSendMqServiceImpl.java @@ -0,0 +1,45 @@ +package com.java3y.austin.support.mq.rabbit; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + + +/** + * @Autor xzcawl + * @Date 2022/7/15 17:29 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) +public class RabbitSendMqServiceImpl implements SendMqService { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Value("${austin.rabbitmq.topic.name}") + private String confTopic; + + @Value("${austin.rabbitmq.exchange.name}") + private String exchangeName; + + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (topic.equals(confTopic)) { + rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue); + } else { + log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic); + } + } + + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index e795ca0fc..5c3245532 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -53,6 +53,25 @@ spring.kafka.consumer.auto.offset.reset=earliest spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.enable-auto-commit=true +##################### Rabbit properties ##################### +server.port=8080 +spring.application.name=cl +#RabbitMq所在服务器IP +spring.rabbitmq.host=127.0.0.1 +#连接端口号 +spring.rabbitmq.port=5672 +#用户名 +spring.rabbitmq.username=root +#用户密码 +spring.rabbitmq.password=123456 +# 开启发送确认 +spring.rabbitmq.publisher-confirm-type=correlated +# 开启发送失败退回 +spring.rabbitmq.publisher-returns=true +spring.rabbitmq.virtual-host=/ +austin.rabbitmq.topic.name=austinRabbit +austin.rabbitmq.exchange.name=austin.point + ##################### redis properties ##################### spring.redis.host=${austin-redis-ip} spring.redis.port=${austin-redis-port} diff --git a/pom.xml b/pom.xml index 49b11522d..ea836cb8b 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,13 @@ ${flink.version} + + + org.apache.flink + flink-connector-rabbitmq + 1.15.1 + + com.github.binarywang @@ -187,6 +194,7 @@ alibaba-dingtalk-service-sdk 2.0.0 +