init
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
package org.dromara.stream;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
|
||||
|
||||
/**
|
||||
* SpringBoot-MQ 案例项目
|
||||
* @author Lion Li
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class RuoYiTestMqApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication application = new SpringApplication(RuoYiTestMqApplication.class);
|
||||
application.setApplicationStartup(new BufferingApplicationStartup(2048));
|
||||
application.run(args);
|
||||
System.out.println("(♥◠‿◠)ノ゙ MQ案例模块启动成功 ლ(´ڡ`ლ)゙ ");
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,72 @@
|
||||
package org.dromara.stream.callback;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.stream.config.RabbitConfig;
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Rabbit回调
|
||||
* @author JC
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
|
||||
|
||||
private static final int MAX_RETRY_COUNT = 3;
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Override
|
||||
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
if (ack) {
|
||||
log.info("消息发送成功: {}", correlationData);
|
||||
} else {
|
||||
log.error("消息发送失败: {}, 原因: {}", correlationData, cause);
|
||||
handleFailedMessage(correlationData);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnedMessage(ReturnedMessage returnedMessage) {
|
||||
log.error("消息返回: ReplyCode: {}, ReplyText: {}, Exchange: {}, RoutingKey: {}, Message: {}",
|
||||
returnedMessage.getReplyCode(),
|
||||
returnedMessage.getReplyText(),
|
||||
returnedMessage.getExchange(),
|
||||
returnedMessage.getRoutingKey(),
|
||||
returnedMessage.getMessage());
|
||||
retrySendMessage(returnedMessage);
|
||||
}
|
||||
|
||||
private void handleFailedMessage(CorrelationData correlationData) {
|
||||
int retryCount = getRetryCount(correlationData);
|
||||
if (retryCount < MAX_RETRY_COUNT) {
|
||||
retryCount++;
|
||||
log.info("正在重试发送消息: {}, 当前重试次数: {}", correlationData, retryCount);
|
||||
retrySend(correlationData);
|
||||
} else {
|
||||
log.error("消息发送失败超过最大重试次数: {}", correlationData);
|
||||
}
|
||||
}
|
||||
|
||||
private int getRetryCount(CorrelationData correlationData) {
|
||||
// 这里可以实现获取重试次数的逻辑,比如从数据库或缓存中获取
|
||||
// 为了简单起见,这里返回0
|
||||
return 0;
|
||||
}
|
||||
|
||||
private void retrySend(CorrelationData correlationData) {
|
||||
String messageContent = correlationData.getId();
|
||||
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, messageContent, correlationData);
|
||||
}
|
||||
|
||||
private void retrySendMessage(ReturnedMessage returnedMessage) {
|
||||
log.info("正在重试发送返回的消息: {}", returnedMessage.getMessage());
|
||||
rabbitTemplate.convertAndSend(returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage());
|
||||
}
|
||||
}
|
@@ -0,0 +1,57 @@
|
||||
package org.dromara.stream.config;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitConfig {
|
||||
|
||||
public static final String EXCHANGE_NAME = "demo-exchange";
|
||||
public static final String QUEUE_NAME = "demo-queue";
|
||||
public static final String ROUTING_KEY = "demo.routing.key";
|
||||
|
||||
/**
|
||||
* 创建交换机
|
||||
* ExchangeBuilder有四种交换机模式
|
||||
* Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。
|
||||
* Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
|
||||
* Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
|
||||
* Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
|
||||
* durable 交换器是否持久化(false 不持久化,true 持久化)
|
||||
**/
|
||||
@Bean
|
||||
public TopicExchange exchange() {
|
||||
return new TopicExchange(EXCHANGE_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建队列
|
||||
* durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码
|
||||
* deliveryMode 消息是否持久化(1 不持久化,2 持久化)
|
||||
**/
|
||||
@Bean
|
||||
public Queue queue() {
|
||||
return new Queue(QUEUE_NAME, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 绑定交换机和队列
|
||||
* bing 方法参数可以是队列和交换机
|
||||
* to 方法参数必须是交换机
|
||||
* with 方法参数是路由Key 这里是以rabbit.开头
|
||||
* noargs 就是不要参数的意思
|
||||
* 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定
|
||||
**/
|
||||
@Bean
|
||||
public Binding binding(Queue queue, TopicExchange exchange) {
|
||||
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,85 @@
|
||||
package org.dromara.stream.config;
|
||||
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* RabbitTTL队列
|
||||
*
|
||||
* @author xbhog
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitTtlQueueConfig {
|
||||
|
||||
// 延迟队列名称
|
||||
public static final String DELAY_QUEUE_NAME = "delay-queue";
|
||||
// 延迟交换机名称
|
||||
public static final String DELAY_EXCHANGE_NAME = "delay-exchange";
|
||||
// 延迟路由键名称
|
||||
public static final String DELAY_ROUTING_KEY = "delay.routing.key";
|
||||
|
||||
// 死信交换机名称
|
||||
public static final String DEAD_LETTER_EXCHANGE = "dlx-exchange";
|
||||
// 死信队列名称
|
||||
public static final String DEAD_LETTER_QUEUE = "dlx-queue";
|
||||
// 死信路由键名称
|
||||
public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";
|
||||
|
||||
/**
|
||||
* 声明延迟队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue delayQueue() {
|
||||
return QueueBuilder.durable(DELAY_QUEUE_NAME)
|
||||
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
|
||||
.deadLetterRoutingKey(DEAD_LETTER_ROUTING_KEY)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明延迟交换机
|
||||
*/
|
||||
@Bean
|
||||
public CustomExchange delayExchange() {
|
||||
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message",
|
||||
true, false, Map.of("x-delayed-type", "direct"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 将延迟队列绑定到延迟交换机
|
||||
*/
|
||||
@Bean
|
||||
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
|
||||
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明死信队列
|
||||
*/
|
||||
@Bean
|
||||
public Queue deadLetterQueue() {
|
||||
return new Queue(DEAD_LETTER_QUEUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 声明死信交换机
|
||||
*/
|
||||
@Bean
|
||||
public DirectExchange deadLetterExchange() {
|
||||
return new DirectExchange(DEAD_LETTER_EXCHANGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 将死信队列绑定到死信交换机
|
||||
*/
|
||||
@Bean
|
||||
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
|
||||
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,24 @@
|
||||
package org.dromara.stream.consumer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/05/19 18:04
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class KafkaNormalConsumer {
|
||||
|
||||
//默认获取最后一条消息
|
||||
@KafkaListener(topics = "test-topic", groupId = "test-group-id")
|
||||
public void timiKafka(ConsumerRecord<String, String> record) {
|
||||
Object key = record.key();
|
||||
Object value = record.value();
|
||||
log.info("【消费者】received the message key {},value:{}", key, value);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,23 @@
|
||||
package org.dromara.stream.consumer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/06/01 16:53
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
|
||||
public class NormalRocketConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt ext) {
|
||||
log.info("【消费者】接收消息:消息体 => {}, tag => {}", new String(ext.getBody()), ext.getTags());
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,43 @@
|
||||
package org.dromara.stream.consumer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.stream.config.RabbitConfig;
|
||||
import org.dromara.stream.config.RabbitTtlQueueConfig;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024年5月18日
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RabbitConsumer {
|
||||
|
||||
/**
|
||||
* 普通消息
|
||||
*/
|
||||
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
|
||||
public void listenQueue(Message message) {
|
||||
log.info("【消费者】Start consuming data:{}",new String(message.getBody()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理延迟队列消息
|
||||
*/
|
||||
@RabbitListener(queues = RabbitTtlQueueConfig.DELAY_QUEUE_NAME)
|
||||
public void receiveDelayMessage(String message){
|
||||
log.info("【消费者】Received delayed message:{}",message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理死信队列消息
|
||||
* 当消息在延迟队列中未能被正确处理(例如因消费者逻辑错误、超时未ACK等原因)
|
||||
* 它会被自动转发到死信队列中,以便后续的特殊处理或重新尝试。
|
||||
*/
|
||||
@RabbitListener(queues = RabbitTtlQueueConfig.DEAD_LETTER_QUEUE)
|
||||
public void receiveDeadMessage(String message){
|
||||
log.info("【消费者】Received dead message:{}",message);
|
||||
}
|
||||
}
|
@@ -0,0 +1,22 @@
|
||||
package org.dromara.stream.consumer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/06/01 16:54
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-group")
|
||||
public class TransactionRocketConsumer implements RocketMQListener<String> {
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("【消费者】===>接收事务消息:{}",message);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,69 @@
|
||||
package org.dromara.stream.controller;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.stream.producer.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping
|
||||
public class PushMessageController {
|
||||
|
||||
@Autowired
|
||||
private NormalRabbitProducer normalRabbitProducer;
|
||||
@Autowired
|
||||
private DelayRabbitProducer delayRabbitProducer;
|
||||
@Autowired
|
||||
private NormalRocketProducer normalRocketProducer;
|
||||
@Autowired
|
||||
private TransactionRocketProducer transactionRocketProducer;
|
||||
@Autowired
|
||||
private KafkaNormalProducer normalKafkaProducer;
|
||||
|
||||
/**
|
||||
* rabbitmq 普通消息
|
||||
*/
|
||||
@GetMapping("/rabbit/send")
|
||||
public void rabbitSend() {
|
||||
normalRabbitProducer.send("hello normal RabbitMsg");
|
||||
}
|
||||
|
||||
/**
|
||||
* rabbitmq 延迟队列消息
|
||||
*/
|
||||
@GetMapping("/rabbit/sendDelay")
|
||||
public void rabbitSendDelay(long delay) {
|
||||
delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg", delay);
|
||||
}
|
||||
|
||||
/**
|
||||
* rocketmq 发送消息
|
||||
* 需要手动创建相关的Topic和group
|
||||
*/
|
||||
@GetMapping("/rocket/send")
|
||||
public void rocketSend(){
|
||||
normalRocketProducer.sendMessage();
|
||||
}
|
||||
|
||||
/**
|
||||
* rocketmq 事务消息
|
||||
*/
|
||||
@GetMapping("/rocket/transaction")
|
||||
public void rocketTransaction(){
|
||||
transactionRocketProducer.sendTransactionMessage();
|
||||
}
|
||||
|
||||
/**
|
||||
* kafka 发送消息
|
||||
*/
|
||||
@GetMapping("/kafka/send")
|
||||
public void kafkaSend(){
|
||||
normalKafkaProducer.sendKafkaMsg();
|
||||
}
|
||||
}
|
@@ -0,0 +1,44 @@
|
||||
package org.dromara.stream.listener;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/06/01 17:05
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQTransactionListener
|
||||
public class TranscationRocketListener implements RocketMQLocalTransactionListener {
|
||||
|
||||
@Override
|
||||
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
|
||||
log.info("执行本地事务");
|
||||
String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
|
||||
if ("TAG-1".equals(tag)) {
|
||||
//这里只讲TAGA消息提交,状态为可执行
|
||||
log.info("【监听器】这里是校验TAG-1;提交状态:COMMIT");
|
||||
return RocketMQLocalTransactionState.COMMIT;
|
||||
} else if ("TAG-2".equals(tag)) {
|
||||
log.info("【监听器】这里是校验TAG-2;提交状态:ROLLBACK");
|
||||
return RocketMQLocalTransactionState.ROLLBACK;
|
||||
} else if ("TAG-3".equals(tag)) {
|
||||
log.info("【监听器】这里是校验TAG-3;提交状态:UNKNOWN");
|
||||
return RocketMQLocalTransactionState.UNKNOWN;
|
||||
}
|
||||
log.info("=========【监听器】提交状态:UNKNOWN");
|
||||
return RocketMQLocalTransactionState.UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
|
||||
log.info("【监听器】检查本地交易===>{}", message);
|
||||
return RocketMQLocalTransactionState.COMMIT;
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,30 @@
|
||||
package org.dromara.stream.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.stream.config.RabbitTtlQueueConfig;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/05/25 17:15
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DelayRabbitProducer {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
public void sendDelayMessage(String message, long delay) {
|
||||
rabbitTemplate.convertAndSend(
|
||||
RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME,
|
||||
RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message, message1 -> {
|
||||
message1.getMessageProperties().setDelayLong(delay);
|
||||
return message1;
|
||||
});
|
||||
log.info("【生产者】Delayed message send: " + message);
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,22 @@
|
||||
package org.dromara.stream.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/05/19 18:02
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class KafkaNormalProducer {
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<String, String> kafkaTemplate;
|
||||
|
||||
public void sendKafkaMsg() {
|
||||
kafkaTemplate.send("test-topic", "hello", "kafkaTest");
|
||||
}
|
||||
}
|
@@ -0,0 +1,23 @@
|
||||
package org.dromara.stream.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.stream.config.RabbitConfig;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NormalRabbitProducer {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
public void send(String message) {
|
||||
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message);
|
||||
log.info("【生产者】Message send: " + message);
|
||||
}
|
||||
}
|
@@ -0,0 +1,39 @@
|
||||
package org.dromara.stream.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/06/01 16:49
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NormalRocketProducer {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
public void sendMessage() {
|
||||
// 发送普通消息
|
||||
// rocketMQTemplate.convertAndSend("test-topic", "test");
|
||||
|
||||
// 发送带tag的消息
|
||||
Message<String> message = MessageBuilder.withPayload("test").build();
|
||||
rocketMQTemplate.send("test-topic:test-tag", message);
|
||||
|
||||
// 延迟消息
|
||||
// RocketMQ预定义了一些延迟等级,每个等级对应不同的延迟时间范围。这些等级从1到18,分别对应1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h的延迟时间。
|
||||
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message();
|
||||
msg.setDelayTimeLevel(3);
|
||||
try {
|
||||
rocketMQTemplate.getProducer().send(msg);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
@@ -0,0 +1,41 @@
|
||||
package org.dromara.stream.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||
import org.apache.rocketmq.client.producer.SendStatus;
|
||||
import org.apache.rocketmq.client.producer.TransactionSendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author xbhog
|
||||
* @date 2024/06/01 16:54
|
||||
**/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TransactionRocketProducer {
|
||||
|
||||
@Autowired
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
public void sendTransactionMessage() {
|
||||
List<String> tags = Arrays.asList("TAG-1", "TAG-2", "TAG-3");
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Message<String> message = MessageBuilder.withPayload("===>事务消息-" + i).build();
|
||||
// destination formats: `topicName:tags` message – message Message arg – ext arg
|
||||
TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction-topic:" + tags.get(i), message, i + 1);
|
||||
if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
log.info("【生产者】事物消息发送成功;成功结果:{}", res);
|
||||
} else {
|
||||
log.info("【生产者】事务发送失败:失败原因:{}", res);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,65 @@
|
||||
server:
|
||||
port: 9402
|
||||
|
||||
# Spring
|
||||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: ruoyi-test-mq
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: @profiles.active@
|
||||
|
||||
--- # rabbitmq 配置
|
||||
spring:
|
||||
rabbitmq:
|
||||
host: localhost
|
||||
port: 5672
|
||||
username: guest
|
||||
password: guest
|
||||
publisher-returns: true
|
||||
publisher-confirm-type: correlated
|
||||
|
||||
--- # kafka 配置
|
||||
spring:
|
||||
kafka:
|
||||
bootstrap-servers: localhost:9092
|
||||
consumer:
|
||||
group-id: test-group-id # 消费者组ID
|
||||
auto-offset-reset: earliest # 当没有偏移量或偏移量无效时,从何处开始消费
|
||||
producer:
|
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
# 以下配置均应该在 kafka 配置文件内编写 写到此处是为了方便调试
|
||||
properties:
|
||||
# 开启自动创建话题功能,默认是false,根据需要设置
|
||||
auto.create.topics.enable: true
|
||||
# 默认副本数 1 避免单机使用无法创建 topic
|
||||
default.replication.factor: 1
|
||||
|
||||
--- # rocketmq 配置
|
||||
rocketmq:
|
||||
name-server: localhost:9876
|
||||
producer:
|
||||
# 生产者组
|
||||
group: dist-test
|
||||
|
||||
--- # nacos 配置
|
||||
spring:
|
||||
cloud:
|
||||
nacos:
|
||||
# nacos 服务地址
|
||||
server-addr: @nacos.server@
|
||||
username: @nacos.username@
|
||||
password: @nacos.password@
|
||||
discovery:
|
||||
# 注册组
|
||||
group: @nacos.discovery.group@
|
||||
namespace: ${spring.profiles.active}
|
||||
config:
|
||||
# 配置组
|
||||
group: @nacos.config.group@
|
||||
namespace: ${spring.profiles.active}
|
||||
config:
|
||||
import:
|
||||
- optional:nacos:application-common.yml
|
@@ -0,0 +1,27 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/${project.artifactId}" />
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="console.log.pattern"
|
||||
value="%cyan(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${console.log.pattern}</pattern>
|
||||
<charset>utf-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<include resource="logback-common.xml" />
|
||||
|
||||
<!-- 开启 skywalking 日志收集 -->
|
||||
<include resource="logback-skylog.xml" />
|
||||
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
</configuration>
|
Reference in New Issue
Block a user