feat(property): 添加自动抄表功能

This commit is contained in:
2025-08-27 19:27:02 +08:00
parent 78d97e14ee
commit 4d76a4df45
14 changed files with 275 additions and 201 deletions

View File

@@ -1,75 +0,0 @@
package org.dromara.sis.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lsm
* @apiNote RocketMQClusterConfig
* @since 2025/8/26
*/
@Configuration
public class RocketMQClusterConfig {
// 从配置文件中读取 cluster 的配置
@Value("${rocketmq.cluster1.name-server}")
private String nameServer1;
@Value("${rocketmq.cluster1.producer.group}")
private String producerGroup1;
// 为第一个集群创建生产者实例
@Bean({"clusterProducerOne"})
public DefaultMQProducer clusterProducerOne() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup1);
producer.setNamesrvAddr(nameServer1);
// 设置发送超时时间
producer.setSendMsgTimeout(5000);
// 设置重试次数
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2);
return producer;
}
// 使用上面的生产者实例创建 RocketMQTemplate
@Bean("rocketMQTemplateClusterOne")
public RocketMQTemplate rocketMQTemplateClusterOne(@Qualifier("clusterProducerOne") DefaultMQProducer producer) {
RocketMQTemplate template = new RocketMQTemplate();
template.setProducer(producer);
return template;
}
// 从配置文件中读取 cluster 的配置
// @Value("${rocketmq.cluster2.name-server}")
// private String nameServer2;
//
// @Value("${rocketmq.cluster2.producer.group}")
// private String producerGroup2;
//
// // 为第二个集群创建生产者实例
// @Bean({"clusterProducerTwo"})
// public DefaultMQProducer clusterProducerTwo() throws Exception {
// DefaultMQProducer producer = new DefaultMQProducer(producerGroup2);
// producer.setNamesrvAddr(nameServer2);
// // 设置发送超时时间
// producer.setSendMsgTimeout(5000);
// // 设置重试次数
// producer.setRetryTimesWhenSendFailed(2);
// producer.setRetryTimesWhenSendAsyncFailed(2);
// return producer;
// }
//
// // 使用上面的生产者实例创建 RocketMQTemplate
// @Bean("rocketMQTemplateClusterTwo")
// public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) {
// RocketMQTemplate template = new RocketMQTemplate();
// template.setProducer(producer);
// return template;
// }
}

View File

@@ -1,44 +0,0 @@
package org.dromara.sis.rocketmq.consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.dromara.sis.rocketmq.RocketMqConstants;
import org.dromara.sis.rocketmq.producer.ProducerService;
import org.springframework.stereotype.Component;
/**
* @author lsm
* @apiNote MeterRecordConsumer
* @since 2025/8/25
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMqConstants.TOPIC,
consumerGroup = RocketMqConstants.METER_GROUP,
selectorExpression = RocketMqConstants.METER_RECORD,
nameServer = "${rocketmq.cluster1.name-server}"
)
public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
private final ProducerService producerService;
@Override
public void onMessage(MessageExt ext) {
try {
if (ext.getBody() == null) {
log.info("仪表上报消息数据,不转发!");
} else {
producerService.defaultSend(RocketMqConstants.TOPIC, RocketMqConstants.METER_RECORD, new String(ext.getBody()));
log.info("转发仪表上报数据处理成功");
}
} catch (Exception e) {
log.error("转发仪表上报数据处理失败,", e);
}
}
}

View File

@@ -1,65 +0,0 @@
package org.dromara.sis.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* @author lsm
* @apiNote ProducerService
* @since 2025/8/26
*/
@Slf4j
@Component
public class ProducerService {
@Autowired
@Qualifier("rocketMQTemplateClusterOne")
private RocketMQTemplate rocketMQTemplateClusterOne;
// @Autowired
// @Qualifier("rocketMQTemplateClusterTwo")
// private RocketMQTemplate rocketMQTemplateClusterTwo;
/**
* 向mq写入消息
*
* @param topic 消息topic
* @param tag 消息tag
* @param msg 消息
*/
public void defaultSend(String topic, String tag, String msg) {
try {
String destination = topic + ":" + tag;
// 使用 RocketMQTemplate 的同步发送方法
rocketMQTemplateClusterOne.syncSend(destination, msg);
log.info("发送RocketMQOne消息成功, nameServer:{}", rocketMQTemplateClusterOne.getProducer().getNamesrvAddr());
} catch (Exception e) {
log.error("发送RocketMQOne消息失败", e);
}
}
/**
* 向mq写入消息
*
* @param topic 消息topic
* @param tag 消息tag
* @param msg 消息
*/
// public void clusterSend(String topic, String tag, String msg) {
// try {
// String destination = topic + ":" + tag;
// // 使用 RocketMQTemplate 的同步发送方法
// rocketMQTemplateClusterTwo.syncSend(destination, msg);
//
// log.info("发送RocketMQTwo消息成功, nameServer:{}", rocketMQTemplateClusterTwo.getProducer().getNamesrvAddr());
// } catch (Exception e) {
// log.error("发送RocketMQTwo消息失败", e);
// }
// }
}