1. 统一包名修改

This commit is contained in:
chenbowen
2025-09-22 11:55:27 +08:00
parent a001fc8f16
commit 0d46897482
2739 changed files with 512 additions and 512 deletions

View File

@@ -0,0 +1,4 @@
/**
* 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
*/
package com.zt.plat.framework.mq;

View File

@@ -0,0 +1,28 @@
package com.zt.plat.framework.mq.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
/**
* RabbitMQ 消息队列配置类
*
* @author ZT
*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class CloudRabbitMQAutoConfiguration {
/**
* Jackson2JsonMessageConverter Bean使用 jackson 序列化消息
*/
@Bean
public MessageConverter createMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

View File

@@ -0,0 +1,4 @@
/**
* 占位符,无特殊逻辑
*/
package com.zt.plat.framework.mq.rabbitmq.core;

View File

@@ -0,0 +1,4 @@
/**
* 消息队列,基于 RabbitMQ 提供
*/
package com.zt.plat.framework.mq.rabbitmq;

View File

@@ -0,0 +1,31 @@
package com.zt.plat.framework.mq.redis.config;
import com.zt.plat.framework.mq.redis.core.RedisMQTemplate;
import com.zt.plat.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.zt.plat.framework.redis.config.CloudRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.List;
/**
* Redis 消息队列 Producer 配置类
*
* @author ZT
*/
@Slf4j
@AutoConfiguration(after = CloudRedisAutoConfiguration.class)
public class CloudRedisMQProducerAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}
}

View File

@@ -0,0 +1,87 @@
package com.zt.plat.framework.mq.redis.core;
import com.zt.plat.framework.common.util.json.JsonUtils;
import com.zt.plat.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.zt.plat.framework.mq.redis.core.message.AbstractRedisMessage;
import com.zt.plat.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import com.zt.plat.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.ArrayList;
import java.util.List;
/**
* Redis MQ 操作模板类
*
* @author ZT
*/
@AllArgsConstructor
public class RedisMQTemplate {
@Getter
private final RedisTemplate<String, ?> redisTemplate;
/**
* 拦截器数组
*/
@Getter
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
/**
* 发送 Redis 消息,基于 Redis pub/sub 实现
*
* @param message 消息
*/
public <T extends AbstractRedisChannelMessage> void send(T message) {
try {
sendMessageBefore(message);
// 发送消息
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
} finally {
sendMessageAfter(message);
}
}
/**
* 发送 Redis 消息,基于 Redis Stream 实现
*
* @param message 消息
* @return 消息记录的编号对象
*/
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
try {
sendMessageBefore(message);
// 发送消息
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(message.getStreamKey())); // 设置 stream key
} finally {
sendMessageAfter(message);
}
}
/**
* 添加拦截器
*
* @param interceptor 拦截器
*/
public void addInterceptor(RedisMessageInterceptor interceptor) {
interceptors.add(interceptor);
}
private void sendMessageBefore(AbstractRedisMessage message) {
// 正序
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
}
private void sendMessageAfter(AbstractRedisMessage message) {
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).sendMessageAfter(message);
}
}
}

View File

@@ -0,0 +1,26 @@
package com.zt.plat.framework.mq.redis.core.interceptor;
import com.zt.plat.framework.mq.redis.core.message.AbstractRedisMessage;
/**
* {@link AbstractRedisMessage} 消息拦截器
* 通过拦截器,作为插件机制,实现拓展。
* 例如说,多租户场景下的 MQ 消息处理
*
* @author ZT
*/
public interface RedisMessageInterceptor {
default void sendMessageBefore(AbstractRedisMessage message) {
}
default void sendMessageAfter(AbstractRedisMessage message) {
}
default void consumeMessageBefore(AbstractRedisMessage message) {
}
default void consumeMessageAfter(AbstractRedisMessage message) {
}
}

View File

@@ -0,0 +1,72 @@
package com.zt.plat.framework.mq.redis.core.job;
import com.zt.plat.framework.mq.redis.core.RedisMQTemplate;
import com.zt.plat.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
/**
* Redis Stream 消息清理任务
* 用于定期清理已消费的消息,防止内存占用过大
*
* @see <a href="https://www.cnblogs.com/nanxiang/p/16179519.html">记一次 redis stream 数据类型内存不释放问题</a>
*
* @author ZT
*/
@Slf4j
@AllArgsConstructor
public class RedisStreamMessageCleanupJob {
private static final String LOCK_KEY = "redis:stream:message-cleanup:lock";
/**
* 保留的消息数量,默认保留最近 10000 条消息
*/
private static final long MAX_COUNT = 10000;
private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate;
private final RedissonClient redissonClient;
/**
* 每小时执行一次清理任务
*/
@Scheduled(cron = "0 0 * * * ?")
public void cleanup() {
RLock lock = redissonClient.getLock(LOCK_KEY);
// 尝试加锁
if (lock.tryLock()) {
try {
execute();
} catch (Exception ex) {
log.error("[cleanup][执行异常]", ex);
} finally {
lock.unlock();
}
}
}
/**
* 执行清理逻辑
*/
private void execute() {
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
listeners.forEach(listener -> {
try {
// 使用 XTRIM 命令清理消息,只保留最近的 MAX_LEN 条消息
Long trimCount = ops.trim(listener.getStreamKey(), MAX_COUNT, true);
if (trimCount != null && trimCount > 0) {
log.info("[execute][Stream({}) 清理消息数量({})]", listener.getStreamKey(), trimCount);
}
} catch (Exception ex) {
log.error("[execute][Stream({}) 清理异常]", listener.getStreamKey(), ex);
}
});
}
}

Some files were not shown because too many files have changed in this diff Show More