初始化 V1
This commit is contained in:
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq;
|
||||
@@ -0,0 +1,28 @@
|
||||
package cn.iocoder.yudao.framework.mq.rabbitmq.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
/**
|
||||
* RabbitMQ 消息队列配置类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@Slf4j
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class YudaoRabbitMQAutoConfiguration {
|
||||
|
||||
/**
|
||||
* Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter createMessageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* 占位符,无特殊逻辑
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.rabbitmq.core;
|
||||
@@ -0,0 +1,4 @@
|
||||
/**
|
||||
* 消息队列,基于 RabbitMQ 提供
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.rabbitmq;
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,31 @@
|
||||
package cn.iocoder.yudao.framework.mq.redis.config;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis 消息队列 Producer 配置类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
|
||||
public class YudaoRedisMQProducerAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
||||
List<RedisMessageInterceptor> interceptors) {
|
||||
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
|
||||
// 添加拦截器
|
||||
interceptors.forEach(redisMQTemplate::addInterceptor);
|
||||
return redisMQTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package cn.iocoder.yudao.framework.mq.redis.core;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis MQ 操作模板类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class RedisMQTemplate {
|
||||
|
||||
@Getter
|
||||
private final RedisTemplate<String, ?> redisTemplate;
|
||||
/**
|
||||
* 拦截器数组
|
||||
*/
|
||||
@Getter
|
||||
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public <T extends AbstractRedisChannelMessage> void send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加拦截器
|
||||
*
|
||||
* @param interceptor 拦截器
|
||||
*/
|
||||
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
|
||||
private void sendMessageBefore(AbstractRedisMessage message) {
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
||||
}
|
||||
|
||||
private void sendMessageAfter(AbstractRedisMessage message) {
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
|
||||
/**
|
||||
* {@link AbstractRedisMessage} 消息拦截器
|
||||
* 通过拦截器,作为插件机制,实现拓展。
|
||||
* 例如说,多租户场景下的 MQ 消息处理
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface RedisMessageInterceptor {
|
||||
|
||||
default void sendMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void sendMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,72 @@
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.job;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.data.redis.core.StreamOperations;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Stream 消息清理任务
|
||||
* 用于定期清理已消费的消息,防止内存占用过大
|
||||
*
|
||||
* @see <a href="https://www.cnblogs.com/nanxiang/p/16179519.html">记一次 redis stream 数据类型内存不释放问题</a>
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class RedisStreamMessageCleanupJob {
|
||||
|
||||
private static final String LOCK_KEY = "redis:stream:message-cleanup:lock";
|
||||
|
||||
/**
|
||||
* 保留的消息数量,默认保留最近 10000 条消息
|
||||
*/
|
||||
private static final long MAX_COUNT = 10000;
|
||||
|
||||
private final List<AbstractRedisStreamMessageListener<?>> listeners;
|
||||
private final RedisMQTemplate redisTemplate;
|
||||
private final RedissonClient redissonClient;
|
||||
|
||||
/**
|
||||
* 每小时执行一次清理任务
|
||||
*/
|
||||
@Scheduled(cron = "0 0 * * * ?")
|
||||
public void cleanup() {
|
||||
RLock lock = redissonClient.getLock(LOCK_KEY);
|
||||
// 尝试加锁
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
execute();
|
||||
} catch (Exception ex) {
|
||||
log.error("[cleanup][执行异常]", ex);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行清理逻辑
|
||||
*/
|
||||
private void execute() {
|
||||
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
|
||||
listeners.forEach(listener -> {
|
||||
try {
|
||||
// 使用 XTRIM 命令清理消息,只保留最近的 MAX_LEN 条消息
|
||||
Long trimCount = ops.trim(listener.getStreamKey(), MAX_COUNT, true);
|
||||
if (trimCount != null && trimCount > 0) {
|
||||
log.info("[execute][Stream({}) 清理消息数量({})]", listener.getStreamKey(), trimCount);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.error("[execute][Stream({}) 清理异常]", listener.getStreamKey(), ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user