feat(rocketmq): 添加仪表记录消费者和生产者服务
This commit is contained in:
@@ -0,0 +1,75 @@
|
||||
package org.dromara.sis.config;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote RocketMQClusterConfig
|
||||
* @since 2025/8/26
|
||||
*/
|
||||
@Configuration
|
||||
public class RocketMQClusterConfig {
|
||||
|
||||
// 从配置文件中读取 cluster 的配置
|
||||
@Value("${rocketmq1.cluster.name-server}")
|
||||
private String nameServer1;
|
||||
|
||||
@Value("${rocketmq1.cluster.producer.group}")
|
||||
private String producerGroup1;
|
||||
|
||||
// 为第一个集群创建生产者实例
|
||||
@Bean({"clusterProducerOne"})
|
||||
public DefaultMQProducer clusterProducerOne() throws Exception {
|
||||
DefaultMQProducer producer = new DefaultMQProducer(producerGroup1);
|
||||
producer.setNamesrvAddr(nameServer1);
|
||||
// 设置发送超时时间
|
||||
producer.setSendMsgTimeout(5000);
|
||||
// 设置重试次数
|
||||
producer.setRetryTimesWhenSendFailed(2);
|
||||
producer.setRetryTimesWhenSendAsyncFailed(2);
|
||||
return producer;
|
||||
}
|
||||
|
||||
// 使用上面的生产者实例创建 RocketMQTemplate
|
||||
@Bean("rocketMQTemplateClusterOne")
|
||||
public RocketMQTemplate rocketMQTemplateClusterOne(@Qualifier("clusterProducerOne") DefaultMQProducer producer) {
|
||||
RocketMQTemplate template = new RocketMQTemplate();
|
||||
template.setProducer(producer);
|
||||
return template;
|
||||
}
|
||||
|
||||
|
||||
// 从配置文件中读取 cluster 的配置
|
||||
@Value("${rocketmq2.cluster.name-server}")
|
||||
private String nameServer2;
|
||||
|
||||
@Value("${rocketmq2.cluster.producer.group}")
|
||||
private String producerGroup2;
|
||||
|
||||
// 为第二个集群创建生产者实例
|
||||
@Bean({"clusterProducerTwo"})
|
||||
public DefaultMQProducer clusterProducerTwo() throws Exception {
|
||||
DefaultMQProducer producer = new DefaultMQProducer(producerGroup2);
|
||||
producer.setNamesrvAddr(nameServer2);
|
||||
// 设置发送超时时间
|
||||
producer.setSendMsgTimeout(5000);
|
||||
// 设置重试次数
|
||||
producer.setRetryTimesWhenSendFailed(2);
|
||||
producer.setRetryTimesWhenSendAsyncFailed(2);
|
||||
return producer;
|
||||
}
|
||||
|
||||
// 使用上面的生产者实例创建 RocketMQTemplate
|
||||
@Bean("rocketMQTemplateClusterTwo")
|
||||
public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) {
|
||||
RocketMQTemplate template = new RocketMQTemplate();
|
||||
template.setProducer(producer);
|
||||
return template;
|
||||
}
|
||||
|
||||
}
|
@@ -10,7 +10,7 @@ public interface RocketMqConstants {
|
||||
String TOPIC = "SmartParks";
|
||||
|
||||
// mq GROUP
|
||||
String GROUP = "SmartParksEqp";
|
||||
String GROUP = "SmartParks";
|
||||
|
||||
/*-----------------------------------消息tag------------------------------------*/
|
||||
String HIKADD = "ADD_HIK_DEVICE_TAG";
|
||||
@@ -19,4 +19,6 @@ public interface RocketMqConstants {
|
||||
// 人脸比对
|
||||
String FACECOMPARE = "FACE_COMPARE_REPORT";
|
||||
|
||||
String METER_RECORD = "METER_RECORD_TAG";
|
||||
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.dromara.sis.rocketmq.RocketMqConstants;
|
||||
import org.dromara.sis.rocketmq.domain.FaceCapture;
|
||||
import org.dromara.sis.service.IZeroSensationPassageService;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
@@ -23,7 +24,8 @@ import org.springframework.stereotype.Component;
|
||||
@RocketMQMessageListener(
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
consumerGroup = RocketMqConstants.GROUP,
|
||||
selectorExpression = RocketMqConstants.FACECAPTURE
|
||||
selectorExpression = RocketMqConstants.FACECAPTURE,
|
||||
nameServer = "${rocketmq2.cluster.name-server}"
|
||||
)
|
||||
public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@@ -34,7 +36,7 @@ public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
||||
log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length);
|
||||
try {
|
||||
FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class);
|
||||
zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
|
||||
// zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
|
||||
} catch (Exception e) {
|
||||
log.error("消费人脸抓拍数据处理失败,", e);
|
||||
}
|
||||
|
@@ -22,7 +22,8 @@ import org.springframework.stereotype.Component;
|
||||
@RocketMQMessageListener(
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
consumerGroup = RocketMqConstants.GROUP,
|
||||
selectorExpression = RocketMqConstants.FACECOMPARE
|
||||
selectorExpression = RocketMqConstants.FACECOMPARE,
|
||||
nameServer = "${rocketmq2.cluster.name-server}"
|
||||
)
|
||||
public class FaceCompareConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@@ -32,7 +33,7 @@ public class FaceCompareConsumer implements RocketMQListener<MessageExt> {
|
||||
log.info("消费人脸比对数据,数据长度={}", ext.getBody().length);
|
||||
try {
|
||||
FaceCompare compare = JSONObject.parseObject(ext.getBody(), FaceCompare.class);
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消费比对抓拍数据处理失败,", e);
|
||||
}
|
||||
|
@@ -0,0 +1,38 @@
|
||||
package org.dromara.sis.rocketmq.consumer;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.dromara.sis.rocketmq.RocketMqConstants;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote MeterRecordConsumer
|
||||
* @since 2025/8/25
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "Meter_Group",
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
selectorExpression = RocketMqConstants.METER_RECORD,
|
||||
nameServer = "${rocketmq2.cluster.name-server}"
|
||||
)
|
||||
public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt ext) {
|
||||
log.info("消费仪表上报数据,数据长度={}", ext.getBody().length);
|
||||
try {
|
||||
String message = new String(ext.getBody());
|
||||
log.info("消费仪表上报数据,数据={}", message);
|
||||
} catch (Exception e) {
|
||||
log.error("消费仪表上报数据处理失败,", e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@@ -0,0 +1,69 @@
|
||||
package org.dromara.sis.rocketmq.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote ProducerService
|
||||
* @since 2025/8/26
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ProducerService {
|
||||
|
||||
@Autowired
|
||||
@Qualifier("rocketMQTemplateClusterOne")
|
||||
private RocketMQTemplate rocketMQTemplateClusterOne;
|
||||
|
||||
@Autowired
|
||||
// @Qualifier("rocketMQTemplateClusterTwo")
|
||||
private RocketMQTemplate rocketMQTemplateClusterTwo;
|
||||
|
||||
/**
|
||||
* 向mq写入消息
|
||||
*
|
||||
* @param topic 消息topic
|
||||
* @param tag 消息tag
|
||||
* @param msg 消息
|
||||
*/
|
||||
public void defaultSend(String topic, String tag, String msg) {
|
||||
try {
|
||||
String destination = topic + ":" + tag;
|
||||
log.info("准备向默认RocketMQ发送消息,目的地:{}", destination);
|
||||
|
||||
// 使用 RocketMQTemplate 的同步发送方法
|
||||
rocketMQTemplateClusterOne.syncSend(destination, msg);
|
||||
|
||||
log.info("发送RocketMQ消息成功");
|
||||
} catch (Exception e) {
|
||||
log.error("发送RocketMQ消息失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 向mq写入消息
|
||||
*
|
||||
* @param topic 消息topic
|
||||
* @param tag 消息tag
|
||||
* @param msg 消息
|
||||
*/
|
||||
public void clusterSend(String topic, String tag, String msg) {
|
||||
try {
|
||||
String destination = topic + ":" + tag;
|
||||
log.info("准备向集群2 RocketMQ发送消息,目的地:{}", destination);
|
||||
|
||||
// 使用 RocketMQTemplate 的同步发送方法
|
||||
rocketMQTemplateClusterTwo.syncSend(destination, msg);
|
||||
|
||||
log.info("发送ClusterRocketMQ消息成功");
|
||||
} catch (Exception e) {
|
||||
log.error("发送RocketMQ消息失败", e);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user