feat(sync): 实现MQ消息队列用户同步功能
- 添加RabbitMQ相关配置,支持消息确认与回退机制 - 新增统一消息实体SyncMessage,支持多种同步事件类型 - 实现RabbitMQ消息生产者RabbitMQSyncProducer,包含回调处理和用户同步消息发送 - 实现用户同步消息消费者UserSyncConsumer,支持消息重试和死信队列处理 - UserSyncService新增Map参数方法,适配MQ消息同步调用 - QrLoginServiceImpl改用MQ消息异步同步用户数据,替代原直接调用方式 - 应用配置新增RabbitMQ连接配置及MQ开关开关控制 - 前端接口和组件调整,支持扫码登录绑定手机号及跳转逻辑完善
This commit is contained in:
@@ -17,6 +17,31 @@
|
|||||||
3. **页面跳转逻辑**:登录成功/绑定成功后跳转到/console
|
3. **页面跳转逻辑**:登录成功/绑定成功后跳转到/console
|
||||||
4. **API调用更新**:适应新的响应字段格式
|
4. **API调用更新**:适应新的响应字段格式
|
||||||
|
|
||||||
|
### 前端代码已完成的修改
|
||||||
|
1. **API接口修改**:
|
||||||
|
- 更新QRLoginStatusResult接口,新增nextAction、redirectUrl、successMessage、needBindPhone等字段
|
||||||
|
- 修改checkQRLoginStatus函数,使用真实后端API调用
|
||||||
|
- 新增bindPhoneForQrLogin函数,处理绑定手机号API调用
|
||||||
|
- 修改generateQRLoginToken函数,使用真实后端API调用
|
||||||
|
|
||||||
|
2. **状态管理修改**:
|
||||||
|
- 更新auth store接口,添加bindPhoneForQRLogin方法
|
||||||
|
- 修改checkQRStatus方法,支持新的返回字段处理
|
||||||
|
- 添加绑定手机号处理逻辑
|
||||||
|
|
||||||
|
3. **组件修改**:
|
||||||
|
- 创建BindPhone组件:用于处理需要绑定手机号的场景
|
||||||
|
- 修改QRLogin组件:支持根据nextAction显示不同UI,自动处理绑定手机号流程
|
||||||
|
- 组件现在可以正确处理:
|
||||||
|
- nextAction: 'bind_phone' → 显示绑定手机号表单
|
||||||
|
- nextAction: 'redirect' → 自动跳转到/console
|
||||||
|
- needBindPhone: true → 显示绑定提示
|
||||||
|
|
||||||
|
4. **完整流程支持**:
|
||||||
|
- 用户扫码登录 → 后端返回nextAction: 'bind_phone' → 前端显示绑定手机号页面
|
||||||
|
- 用户输入手机号和验证码 → 调用绑定手机号API → 绑定成功后自动登录并跳转/console
|
||||||
|
- 用户已有手机号 → 后端返回nextAction: 'redirect' → 前端自动跳转到/console
|
||||||
|
|
||||||
### 后端修改完成
|
### 后端修改完成
|
||||||
1. **QrLoginStatusResponse新增字段**:
|
1. **QrLoginStatusResponse新增字段**:
|
||||||
- nextAction: 下一步操作指示
|
- nextAction: 下一步操作指示
|
||||||
@@ -39,3 +64,39 @@
|
|||||||
### 目的
|
### 目的
|
||||||
确保用户数据同步到 websopy 时,手机号字段已有值,避免无效的缓存数据。
|
确保用户数据同步到 websopy 时,手机号字段已有值,避免无效的缓存数据。
|
||||||
|
|
||||||
|
|
||||||
|
## MQ消息队列实现 (02:40)
|
||||||
|
|
||||||
|
### 创建的文件
|
||||||
|
1. **SyncMessage.java** - 统一消息实体
|
||||||
|
- 位置: `com.gxwebsoft.common.mq.message`
|
||||||
|
- 支持 USER_SYNC, TENANT_SYNC 等消息类型
|
||||||
|
- 支持 CREATE, UPDATE, DELETE 事件类型
|
||||||
|
|
||||||
|
2. **SyncMessageProducer.java** - 消息生产者接口
|
||||||
|
- 位置: `com.gxwebsoft.common.mq.producer`
|
||||||
|
- 预留抽象层,便于将来切换到RocketMQ
|
||||||
|
|
||||||
|
3. **RabbitMQConfig.java** - RabbitMQ配置类
|
||||||
|
- 位置: `com.gxwebsoft.common.mq.config`
|
||||||
|
- 定义交换机、队列、死信队列
|
||||||
|
- 配置JSON序列化
|
||||||
|
|
||||||
|
4. **RabbitMQSyncProducer.java** - RabbitMQ生产者实现
|
||||||
|
- 位置: `com.gxwebsoft.common.mq.producer.impl`
|
||||||
|
- 实现确认回调和Return回调
|
||||||
|
|
||||||
|
5. **UserSyncConsumer.java** - 用户同步消费者
|
||||||
|
- 位置: `com.gxwebsoft.common.mq.consumer`
|
||||||
|
- 监听用户同步消息,调用UserSyncService同步到websopy
|
||||||
|
- 支持重试机制和死信队列
|
||||||
|
|
||||||
|
### 修改的文件
|
||||||
|
1. **pom.xml** - 添加 spring-boot-starter-amqp 依赖
|
||||||
|
2. **UserSyncService.java** - 添加Map参数的重载方法
|
||||||
|
3. **QrLoginServiceImpl.java** - 改用MQ发送消息
|
||||||
|
4. **application.yml** - 添加RabbitMQ配置
|
||||||
|
|
||||||
|
### 使用方式
|
||||||
|
配置 `sync.mq.enabled: false` 可临时禁用MQ,回退到原有直接同步方式。
|
||||||
|
|
||||||
|
|||||||
6
pom.xml
6
pom.xml
@@ -316,6 +316,12 @@
|
|||||||
<version>1.0.0.14</version>
|
<version>1.0.0.14</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- spring-boot-starter-amqp for RabbitMQ -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-amqp</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- knife4j - 升级到兼容版本 -->
|
<!-- knife4j - 升级到兼容版本 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.github.xiaoymin</groupId>
|
<groupId>com.github.xiaoymin</groupId>
|
||||||
|
|||||||
@@ -16,11 +16,11 @@ import com.gxwebsoft.common.core.security.JwtSubject;
|
|||||||
import com.gxwebsoft.common.core.security.JwtUtil;
|
import com.gxwebsoft.common.core.security.JwtUtil;
|
||||||
import com.gxwebsoft.common.core.utils.CommonUtil;
|
import com.gxwebsoft.common.core.utils.CommonUtil;
|
||||||
import com.gxwebsoft.common.core.utils.RedisUtil;
|
import com.gxwebsoft.common.core.utils.RedisUtil;
|
||||||
|
import com.gxwebsoft.common.mq.producer.SyncMessageProducer;
|
||||||
import com.gxwebsoft.common.system.entity.User;
|
import com.gxwebsoft.common.system.entity.User;
|
||||||
import com.gxwebsoft.common.system.entity.UserOauth;
|
import com.gxwebsoft.common.system.entity.UserOauth;
|
||||||
import com.gxwebsoft.common.system.service.UserOauthService;
|
import com.gxwebsoft.common.system.service.UserOauthService;
|
||||||
import com.gxwebsoft.common.system.service.UserService;
|
import com.gxwebsoft.common.system.service.UserService;
|
||||||
import com.gxwebsoft.common.system.service.UserSyncService;
|
|
||||||
import com.gxwebsoft.common.system.service.WxService;
|
import com.gxwebsoft.common.system.service.WxService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -61,7 +61,7 @@ public class QrLoginServiceImpl implements QrLoginService {
|
|||||||
private UserOauthService userOauthService;
|
private UserOauthService userOauthService;
|
||||||
|
|
||||||
@Autowired(required = false)
|
@Autowired(required = false)
|
||||||
private UserSyncService userSyncService;
|
private SyncMessageProducer syncMessageProducer;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QrLoginGenerateResponse generateQrLoginToken(Integer tenantId) {
|
public QrLoginGenerateResponse generateQrLoginToken(Integer tenantId) {
|
||||||
@@ -295,12 +295,12 @@ public class QrLoginServiceImpl implements QrLoginService {
|
|||||||
userService.updateUser(user);
|
userService.updateUser(user);
|
||||||
redisUtil.delete(codeKey);
|
redisUtil.delete(codeKey);
|
||||||
|
|
||||||
// 绑定手机号成功后,同步用户数据到 websopy
|
// 绑定手机号成功后,通过MQ异步同步用户数据到 websopy
|
||||||
if (userSyncService != null) {
|
if (syncMessageProducer != null) {
|
||||||
User updatedUser = userService.getAllByUserId(String.valueOf(user.getUserId()));
|
User updatedUser = userService.getAllByUserId(String.valueOf(user.getUserId()));
|
||||||
if (updatedUser != null) {
|
if (updatedUser != null) {
|
||||||
userSyncService.syncUserToWebsopy(updatedUser);
|
syncMessageProducer.sendUserSyncMessage("websopy", "UPDATE", updatedUser);
|
||||||
log.info("扫码绑定手机号后同步用户到websopy成功: userId={}, phone={}", user.getUserId(), user.getPhone());
|
log.info("扫码绑定手机号后发送MQ消息同步用户到websopy: userId={}, phone={}", user.getUserId(), user.getPhone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
151
src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java
Normal file
151
src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
package com.gxwebsoft.common.mq.config;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||||
|
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||||
|
import org.springframework.amqp.core.*;
|
||||||
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||||
|
import org.springframework.amqp.support.converter.MessageConverter;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RabbitMQ 配置类
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
|
||||||
|
public class RabbitMQConfig {
|
||||||
|
|
||||||
|
// ==================== 常量定义 ====================
|
||||||
|
public static final String SYNC_EXCHANGE = "sync.exchange";
|
||||||
|
public static final String SYNC_QUEUE = "sync.queue";
|
||||||
|
public static final String SYNC_ROUTING_KEY = "sync.message";
|
||||||
|
|
||||||
|
// 死信队列
|
||||||
|
public static final String DLX_EXCHANGE = "sync.dlx.exchange";
|
||||||
|
public static final String DLQ_QUEUE = "sync.dlq";
|
||||||
|
public static final String DLQ_ROUTING_KEY = "sync.dlq";
|
||||||
|
|
||||||
|
@Value("${spring.rabbitmq.host:localhost}")
|
||||||
|
private String host;
|
||||||
|
|
||||||
|
@Value("${spring.rabbitmq.port:5672}")
|
||||||
|
private int port;
|
||||||
|
|
||||||
|
@Value("${spring.rabbitmq.username:guest}")
|
||||||
|
private String username;
|
||||||
|
|
||||||
|
@Value("${spring.rabbitmq.password:guest}")
|
||||||
|
private String password;
|
||||||
|
|
||||||
|
@Value("${spring.rabbitmq.virtual-host:/}")
|
||||||
|
private String virtualHost;
|
||||||
|
|
||||||
|
// ==================== Connection Factory ====================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConnectionFactory connectionFactory() {
|
||||||
|
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
||||||
|
connectionFactory.setHost(host);
|
||||||
|
connectionFactory.setPort(port);
|
||||||
|
connectionFactory.setUsername(username);
|
||||||
|
connectionFactory.setPassword(password);
|
||||||
|
connectionFactory.setVirtualHost(virtualHost);
|
||||||
|
// 开启publisher-confirm确认模式
|
||||||
|
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
|
||||||
|
// 开启publisher-return确认模式
|
||||||
|
connectionFactory.setPublisherReturns(true);
|
||||||
|
return connectionFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Message Converter ====================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ObjectMapper objectMapper() {
|
||||||
|
ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
objectMapper.registerModule(new JavaTimeModule());
|
||||||
|
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
|
||||||
|
return objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public MessageConverter messageConverter(ObjectMapper objectMapper) {
|
||||||
|
return new Jackson2JsonMessageConverter(objectMapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== RabbitTemplate ====================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
|
||||||
|
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
||||||
|
rabbitTemplate.setMessageConverter(messageConverter);
|
||||||
|
// 设置Mandatory为true,才能触发ReturnCallback
|
||||||
|
rabbitTemplate.setMandatory(true);
|
||||||
|
return rabbitTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
||||||
|
ConnectionFactory connectionFactory, MessageConverter messageConverter) {
|
||||||
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
|
factory.setConnectionFactory(connectionFactory);
|
||||||
|
factory.setMessageConverter(messageConverter);
|
||||||
|
// 设置并发数
|
||||||
|
factory.setConcurrent(1);
|
||||||
|
factory.setMaxConcurrent(5);
|
||||||
|
// 设置手动ack
|
||||||
|
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||||
|
// 预取数量
|
||||||
|
factory.setPrefetchCount(10);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 交换机 ====================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public DirectExchange syncExchange() {
|
||||||
|
return new DirectExchange(SYNC_EXCHANGE, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public DirectExchange deadLetterExchange() {
|
||||||
|
return new DirectExchange(DLX_EXCHANGE, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 队列 ====================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue syncQueue() {
|
||||||
|
return QueueBuilder.durable(SYNC_QUEUE)
|
||||||
|
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
|
||||||
|
.withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue deadLetterQueue() {
|
||||||
|
return QueueBuilder.durable(DLQ_QUEUE).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== 绑定 ====================
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Binding syncBinding() {
|
||||||
|
return BindingBuilder.bind(syncQueue())
|
||||||
|
.to(syncExchange())
|
||||||
|
.with(SYNC_ROUTING_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Binding dlqBinding() {
|
||||||
|
return BindingBuilder.bind(deadLetterQueue())
|
||||||
|
.to(deadLetterExchange())
|
||||||
|
.with(DLQ_ROUTING_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,159 @@
|
|||||||
|
package com.gxwebsoft.common.mq.consumer;
|
||||||
|
|
||||||
|
import com.gxwebsoft.common.mq.config.RabbitMQConfig;
|
||||||
|
import com.gxwebsoft.common.mq.message.SyncMessage;
|
||||||
|
import com.gxwebsoft.common.system.service.UserSyncService;
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户同步消息消费者
|
||||||
|
*
|
||||||
|
* 负责监听用户同步消息,并将数据同步到目标系统(如websopy)
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
|
||||||
|
public class UserSyncConsumer {
|
||||||
|
|
||||||
|
@Autowired(required = false)
|
||||||
|
private UserSyncService userSyncService;
|
||||||
|
|
||||||
|
private static final int MAX_RETRY_COUNT = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监听用户同步消息
|
||||||
|
*/
|
||||||
|
@RabbitListener(queues = RabbitMQConfig.SYNC_QUEUE)
|
||||||
|
public void handleMessage(SyncMessage message, Channel channel, Message amqpMessage) {
|
||||||
|
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
||||||
|
|
||||||
|
try {
|
||||||
|
log.info("收到MQ消息: messageId={}, type={}, event={}, target={}",
|
||||||
|
message.getMessageId(), message.getMessageType(),
|
||||||
|
message.getEventType(), message.getTargetSystem());
|
||||||
|
|
||||||
|
// 检查是否启用了同步服务
|
||||||
|
if (userSyncService == null) {
|
||||||
|
log.warn("UserSyncService 未启用,跳过消息处理: messageId={}", message.getMessageId());
|
||||||
|
channel.basicAck(deliveryTag, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理消息
|
||||||
|
processMessage(message);
|
||||||
|
|
||||||
|
// 确认消息
|
||||||
|
channel.basicAck(deliveryTag, false);
|
||||||
|
log.info("消息处理成功: messageId={}", message.getMessageId());
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("消息处理失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
|
||||||
|
handleFailure(channel, amqpMessage, deliveryTag, message, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理消息
|
||||||
|
*/
|
||||||
|
private void processMessage(SyncMessage message) {
|
||||||
|
String messageType = message.getMessageType();
|
||||||
|
String eventType = message.getEventType();
|
||||||
|
String targetSystem = message.getTargetSystem();
|
||||||
|
Map<String, Object> data = message.getData();
|
||||||
|
|
||||||
|
// 判断消息类型并处理
|
||||||
|
if ("USER_SYNC".equals(messageType)) {
|
||||||
|
handleUserSync(targetSystem, eventType, data);
|
||||||
|
} else {
|
||||||
|
log.warn("未知的消息类型: messageType={}", messageType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理用户同步
|
||||||
|
*/
|
||||||
|
private void handleUserSync(String targetSystem, String eventType, Map<String, Object> data) {
|
||||||
|
switch (eventType) {
|
||||||
|
case "CREATE":
|
||||||
|
case "UPDATE":
|
||||||
|
userSyncService.syncUserToWebsopy(data);
|
||||||
|
log.info("用户同步到{}成功: event={}", targetSystem, eventType);
|
||||||
|
break;
|
||||||
|
case "DELETE":
|
||||||
|
userSyncService.deleteUserFromWebsopy(data);
|
||||||
|
log.info("用户从{}删除成功: event={}", targetSystem, eventType);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
log.warn("未知的用户事件类型: eventType={}", eventType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理失败消息
|
||||||
|
*/
|
||||||
|
private void handleFailure(Channel channel, Message amqpMessage, long deliveryTag,
|
||||||
|
SyncMessage message, Exception e) {
|
||||||
|
Integer retryCount = message.getRetryCount();
|
||||||
|
if (retryCount == null) {
|
||||||
|
retryCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (retryCount < MAX_RETRY_COUNT) {
|
||||||
|
// 重试:拒绝消息并重新入队
|
||||||
|
try {
|
||||||
|
log.warn("消息处理失败,准备重试: messageId={}, retryCount={}/{}",
|
||||||
|
message.getMessageId(), retryCount + 1, MAX_RETRY_COUNT);
|
||||||
|
// 增加重试次数
|
||||||
|
message.setRetryCount(retryCount + 1);
|
||||||
|
// 拒绝消息(requeue=true 会重新入队)
|
||||||
|
channel.basicNack(deliveryTag, false, true);
|
||||||
|
} catch (IOException ioException) {
|
||||||
|
log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 超过重试次数,发送到死信队列
|
||||||
|
try {
|
||||||
|
log.error("消息处理失败次数超限,发送到死信队列: messageId={}, retryCount={}",
|
||||||
|
message.getMessageId(), retryCount);
|
||||||
|
// 拒绝消息(requeue=false 进入死信队列)
|
||||||
|
channel.basicNack(deliveryTag, false, false);
|
||||||
|
} catch (IOException ioException) {
|
||||||
|
log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监听死信队列消息(可选,用于告警或人工处理)
|
||||||
|
*/
|
||||||
|
@RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE)
|
||||||
|
public void handleDeadLetter(SyncMessage message, Channel channel, Message amqpMessage) {
|
||||||
|
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
||||||
|
try {
|
||||||
|
log.error("死信消息: messageId={}, type={}, event={}, target={}, retryCount={}",
|
||||||
|
message.getMessageId(), message.getMessageType(),
|
||||||
|
message.getEventType(), message.getTargetSystem(),
|
||||||
|
message.getRetryCount());
|
||||||
|
|
||||||
|
// TODO: 可以在这里添加告警逻辑,如发送邮件、钉钉通知等
|
||||||
|
|
||||||
|
channel.basicAck(deliveryTag, false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理死信消息失败: messageId={}", message.getMessageId(), e);
|
||||||
|
try {
|
||||||
|
channel.basicAck(deliveryTag, false);
|
||||||
|
} catch (IOException ioException) {
|
||||||
|
log.error("确认死信消息失败", ioException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,80 @@
|
|||||||
|
package com.gxwebsoft.common.mq.message;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 统一消息实体 - 用于各模块间的数据同步
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class SyncMessage implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息唯一ID
|
||||||
|
*/
|
||||||
|
private String messageId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息类型:USER_SYNC, TENANT_SYNC, etc.
|
||||||
|
*/
|
||||||
|
private String messageType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 事件类型:CREATE, UPDATE, DELETE
|
||||||
|
*/
|
||||||
|
private String eventType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 目标系统标识
|
||||||
|
*/
|
||||||
|
private String targetSystem;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 业务数据(Map格式,便于序列化)
|
||||||
|
*/
|
||||||
|
private Map<String, Object> data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息重试次数
|
||||||
|
*/
|
||||||
|
private Integer retryCount;
|
||||||
|
|
||||||
|
public SyncMessage() {
|
||||||
|
this.createTime = LocalDateTime.now();
|
||||||
|
this.retryCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SyncMessage(String messageType, String eventType, String targetSystem, Map<String, Object> data) {
|
||||||
|
this();
|
||||||
|
this.messageId = java.util.UUID.randomUUID().toString().replace("-", "");
|
||||||
|
this.messageType = messageType;
|
||||||
|
this.eventType = eventType;
|
||||||
|
this.targetSystem = targetSystem;
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建用户同步消息
|
||||||
|
*/
|
||||||
|
public static SyncMessage userCreate(String targetSystem, Map<String, Object> userData) {
|
||||||
|
return new SyncMessage("USER_SYNC", "CREATE", targetSystem, userData);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SyncMessage userUpdate(String targetSystem, Map<String, Object> userData) {
|
||||||
|
return new SyncMessage("USER_SYNC", "UPDATE", targetSystem, userData);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SyncMessage userDelete(String targetSystem, Map<String, Object> userData) {
|
||||||
|
return new SyncMessage("USER_SYNC", "DELETE", targetSystem, userData);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package com.gxwebsoft.common.mq.producer;
|
||||||
|
|
||||||
|
import com.gxwebsoft.common.mq.message.SyncMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息生产者接口 - 预留抽象层,便于将来切换MQ实现(如从RabbitMQ迁移到RocketMQ)
|
||||||
|
*/
|
||||||
|
public interface SyncMessageProducer {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送同步消息
|
||||||
|
*
|
||||||
|
* @param message 消息体
|
||||||
|
*/
|
||||||
|
void sendSyncMessage(SyncMessage message);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送同步消息(带回调)
|
||||||
|
*
|
||||||
|
* @param message 消息体
|
||||||
|
* @param callback 发送回调
|
||||||
|
*/
|
||||||
|
void sendSyncMessage(SyncMessage message, SendCallback callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送用户同步消息
|
||||||
|
*
|
||||||
|
* @param targetSystem 目标系统
|
||||||
|
* @param eventType 事件类型:CREATE, UPDATE, DELETE
|
||||||
|
* @param userData 用户数据
|
||||||
|
*/
|
||||||
|
void sendUserSyncMessage(String targetSystem, String eventType, Object userData);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送回调接口
|
||||||
|
*/
|
||||||
|
interface SendCallback {
|
||||||
|
void onSuccess(String messageId);
|
||||||
|
void onFailure(String messageId, Throwable throwable);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,126 @@
|
|||||||
|
package com.gxwebsoft.common.mq.producer.impl;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.gxwebsoft.common.mq.config.RabbitMQConfig;
|
||||||
|
import com.gxwebsoft.common.mq.message.SyncMessage;
|
||||||
|
import com.gxwebsoft.common.mq.producer.SyncMessageProducer;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.core.MessageProperties;
|
||||||
|
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.support.converter.MessageConverter;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Autowired;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RabbitMQ 消息生产者实现
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
|
||||||
|
public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
|
||||||
|
|
||||||
|
private final RabbitTemplate rabbitTemplate;
|
||||||
|
private final MessageConverter messageConverter;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public RabbitMQSyncProducer(RabbitTemplate rabbitTemplate, MessageConverter messageConverter, ObjectMapper objectMapper) {
|
||||||
|
this.rabbitTemplate = rabbitTemplate;
|
||||||
|
this.messageConverter = messageConverter;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
// 设置确认回调
|
||||||
|
this.rabbitTemplate.setConfirmCallback(this);
|
||||||
|
this.rabbitTemplate.setReturnCallback(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendSyncMessage(SyncMessage message) {
|
||||||
|
sendSyncMessage(message, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendSyncMessage(SyncMessage message, SendCallback callback) {
|
||||||
|
try {
|
||||||
|
log.info("发送MQ消息: messageId={}, type={}, event={}, target={}",
|
||||||
|
message.getMessageId(), message.getMessageType(),
|
||||||
|
message.getEventType(), message.getTargetSystem());
|
||||||
|
|
||||||
|
CorrelationData correlationData = new CorrelationData(message.getMessageId());
|
||||||
|
|
||||||
|
if (callback != null) {
|
||||||
|
correlationData.getFuture().addCallback(
|
||||||
|
result -> {
|
||||||
|
if (result.isAck()) {
|
||||||
|
callback.onSuccess(message.getMessageId());
|
||||||
|
} else {
|
||||||
|
callback.onFailure(message.getMessageId(),
|
||||||
|
new RuntimeException("消息发送未被确认"));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ex -> callback.onFailure(message.getMessageId(), ex)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
rabbitTemplate.convertAndSend(
|
||||||
|
RabbitMQConfig.SYNC_EXCHANGE,
|
||||||
|
RabbitMQConfig.SYNC_ROUTING_KEY,
|
||||||
|
message,
|
||||||
|
correlationData
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("发送MQ消息失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
|
||||||
|
if (callback != null) {
|
||||||
|
callback.onFailure(message.getMessageId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendUserSyncMessage(String targetSystem, String eventType, Object userData) {
|
||||||
|
try {
|
||||||
|
Map<String, Object> dataMap;
|
||||||
|
if (userData instanceof Map) {
|
||||||
|
dataMap = (Map<String, Object>) userData;
|
||||||
|
} else {
|
||||||
|
// 转换为Map
|
||||||
|
dataMap = objectMapper.convertValue(userData, Map.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncMessage message = new SyncMessage("USER_SYNC", eventType, targetSystem, dataMap);
|
||||||
|
sendSyncMessage(message);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("发送用户同步消息失败: targetSystem={}, eventType={}, error={}",
|
||||||
|
targetSystem, eventType, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 确认回调 - Broker确认收到消息
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||||
|
String messageId = correlationData.getId();
|
||||||
|
if (ack) {
|
||||||
|
log.debug("消息确认成功: messageId={}", messageId);
|
||||||
|
} else {
|
||||||
|
log.warn("消息确认失败: messageId={}, cause={}", messageId, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return回调 - 消息无法路由时回调
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void returnedMessage(Message message, int replyCode, String replyText,
|
||||||
|
String exchange, String routingKey) {
|
||||||
|
log.error("消息无法路由: exchange={}, routingKey={}, replyCode={}, replyText={}",
|
||||||
|
exchange, routingKey, replyCode, replyText);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -120,6 +120,145 @@ public class UserSyncService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 同步用户到 websopy(Map格式,用于MQ消息处理)
|
||||||
|
*
|
||||||
|
* @param data 用户信息Map
|
||||||
|
*/
|
||||||
|
public void syncUserToWebsopy(Map<String, Object> data) {
|
||||||
|
String websopyBaseUrl = getWebsopyBaseUrl();
|
||||||
|
if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
|
||||||
|
log.warn("websopyUrl 未配置,跳过用户同步");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data == null || data.isEmpty()) {
|
||||||
|
log.warn("用户数据为空,跳过同步");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Integer userId = getIntValue(data, "userId");
|
||||||
|
if (userId == null) {
|
||||||
|
log.warn("用户数据中缺少userId,跳过同步");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer tenantIdValue = getIntValue(data, "tenantId");
|
||||||
|
if (tenantIdValue == null) {
|
||||||
|
tenantIdValue = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建JSON字符串
|
||||||
|
StringBuilder jsonBuilder = new StringBuilder();
|
||||||
|
jsonBuilder.append("{");
|
||||||
|
jsonBuilder.append("\"userId\":").append(userId).append(",");
|
||||||
|
jsonBuilder.append("\"username\":\"").append(escapeJson(getStringValue(data, "username"))).append("\",");
|
||||||
|
jsonBuilder.append("\"nickname\":\"").append(escapeJson(getStringValue(data, "nickname"))).append("\",");
|
||||||
|
jsonBuilder.append("\"avatar\":\"").append(escapeJson(getStringValue(data, "avatar"))).append("\",");
|
||||||
|
jsonBuilder.append("\"phone\":\"").append(escapeJson(getStringValue(data, "phone"))).append("\",");
|
||||||
|
jsonBuilder.append("\"status\":").append(getIntValue(data, "status", 1)).append(",");
|
||||||
|
jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\",");
|
||||||
|
jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(",");
|
||||||
|
jsonBuilder.append("\"tenant_id\":").append(tenantIdValue);
|
||||||
|
jsonBuilder.append("}");
|
||||||
|
|
||||||
|
String url = websopyBaseUrl + "/api/app/user-sync/single";
|
||||||
|
String body = jsonBuilder.toString();
|
||||||
|
|
||||||
|
log.info("MQ同步用户到 websopy: userId={}, body={}", userId, body);
|
||||||
|
|
||||||
|
Map<String, String> headers = new HashMap<>();
|
||||||
|
headers.put("Content-Type", "application/json");
|
||||||
|
|
||||||
|
HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body);
|
||||||
|
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
JSONObject result = JSON.parseObject(responseBody);
|
||||||
|
if (result != null && result.getIntValue("code") == 0) {
|
||||||
|
log.info("MQ用户同步成功: userId={}", userId);
|
||||||
|
} else {
|
||||||
|
String message = result != null ? result.getString("message") : "未知错误";
|
||||||
|
log.error("MQ用户同步失败: userId={}, message={}", userId, message);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("MQ用户同步异常: error={}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从 websopy 删除用户
|
||||||
|
*
|
||||||
|
* @param data 包含userId的数据
|
||||||
|
*/
|
||||||
|
public void deleteUserFromWebsopy(Map<String, Object> data) {
|
||||||
|
String websopyBaseUrl = getWebsopyBaseUrl();
|
||||||
|
if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
|
||||||
|
log.warn("websopyUrl 未配置,跳过删除用户");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data == null) {
|
||||||
|
log.warn("删除用户数据为空");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Integer userId = getIntValue(data, "userId");
|
||||||
|
if (userId == null) {
|
||||||
|
log.warn("删除用户数据中缺少userId");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String url = websopyBaseUrl + "/api/app/user-sync/delete/" + userId;
|
||||||
|
|
||||||
|
log.info("从 websopy 删除用户: userId={}", userId);
|
||||||
|
|
||||||
|
Map<String, String> headers = new HashMap<>();
|
||||||
|
headers.put("Content-Type", "application/json");
|
||||||
|
|
||||||
|
HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, "");
|
||||||
|
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
JSONObject result = JSON.parseObject(responseBody);
|
||||||
|
if (result != null && result.getIntValue("code") == 0) {
|
||||||
|
log.info("MQ删除用户成功: userId={}", userId);
|
||||||
|
} else {
|
||||||
|
String message = result != null ? result.getString("message") : "未知错误";
|
||||||
|
log.error("MQ删除用户失败: userId={}, message={}", userId, message);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("MQ删除用户异常: error={}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getStringValue(Map<String, Object> data, String key) {
|
||||||
|
Object value = data.get(key);
|
||||||
|
return value != null ? String.valueOf(value) : "";
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getIntValue(Map<String, Object> data, String key) {
|
||||||
|
return getIntValue(data, key, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getIntValue(Map<String, Object> data, String key, Integer defaultValue) {
|
||||||
|
Object value = data.get(key);
|
||||||
|
if (value == null) {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
if (value instanceof Integer) {
|
||||||
|
return (Integer) value;
|
||||||
|
}
|
||||||
|
if (value instanceof Number) {
|
||||||
|
return ((Number) value).intValue();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return Integer.parseInt(String.valueOf(value));
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 刷新 websopy 端的用户缓存
|
* 刷新 websopy 端的用户缓存
|
||||||
* 只传 userId,websopy 端会通过 API 回查获取完整信息
|
* 只传 userId,websopy 端会通过 API 回查获取完整信息
|
||||||
|
|||||||
@@ -58,6 +58,18 @@ spring:
|
|||||||
port: 6379
|
port: 6379
|
||||||
password: redis_WSDb88
|
password: redis_WSDb88
|
||||||
|
|
||||||
|
# RabbitMQ 配置
|
||||||
|
rabbitmq:
|
||||||
|
host: 1Panel-rabbitmq-kvHZ
|
||||||
|
port: 5672
|
||||||
|
username: rabbitmq
|
||||||
|
password: rabbitmq
|
||||||
|
virtual-host: /
|
||||||
|
# 开启确认模式
|
||||||
|
publisher-confirm-type: correlated
|
||||||
|
# 开启Return模式
|
||||||
|
publisher-returns: true
|
||||||
|
|
||||||
# 邮件服务器配置
|
# 邮件服务器配置
|
||||||
mail:
|
mail:
|
||||||
host: smtp.qq.com
|
host: smtp.qq.com
|
||||||
@@ -111,6 +123,12 @@ config:
|
|||||||
bucketDomain: https://oss.wsdns.cn
|
bucketDomain: https://oss.wsdns.cn
|
||||||
aliyunDomain: https://oss-gxwebsoft.oss-cn-shenzhen.aliyuncs.com
|
aliyunDomain: https://oss-gxwebsoft.oss-cn-shenzhen.aliyuncs.com
|
||||||
|
|
||||||
|
# MQ同步配置
|
||||||
|
sync:
|
||||||
|
# 是否启用MQ(设为false则使用原有直接同步方式)
|
||||||
|
mq:
|
||||||
|
enabled: true
|
||||||
|
|
||||||
# JWT配置
|
# JWT配置
|
||||||
jwt:
|
jwt:
|
||||||
secret: websoft-jwt-secret-key-2025-dev-environment
|
secret: websoft-jwt-secret-key-2025-dev-environment
|
||||||
|
|||||||
Reference in New Issue
Block a user