diff --git a/.workbuddy/memory/2026-04-07.md b/.workbuddy/memory/2026-04-07.md
index 38e8a5a..8ae4245 100644
--- a/.workbuddy/memory/2026-04-07.md
+++ b/.workbuddy/memory/2026-04-07.md
@@ -17,6 +17,31 @@
3. **页面跳转逻辑**:登录成功/绑定成功后跳转到/console
4. **API调用更新**:适应新的响应字段格式
+### 前端代码已完成的修改
+1. **API接口修改**:
+ - 更新QRLoginStatusResult接口,新增nextAction、redirectUrl、successMessage、needBindPhone等字段
+ - 修改checkQRLoginStatus函数,使用真实后端API调用
+ - 新增bindPhoneForQrLogin函数,处理绑定手机号API调用
+ - 修改generateQRLoginToken函数,使用真实后端API调用
+
+2. **状态管理修改**:
+ - 更新auth store接口,添加bindPhoneForQRLogin方法
+ - 修改checkQRStatus方法,支持新的返回字段处理
+ - 添加绑定手机号处理逻辑
+
+3. **组件修改**:
+ - 创建BindPhone组件:用于处理需要绑定手机号的场景
+ - 修改QRLogin组件:支持根据nextAction显示不同UI,自动处理绑定手机号流程
+ - 组件现在可以正确处理:
+ - nextAction: 'bind_phone' → 显示绑定手机号表单
+ - nextAction: 'redirect' → 自动跳转到/console
+ - needBindPhone: true → 显示绑定提示
+
+4. **完整流程支持**:
+ - 用户扫码登录 → 后端返回nextAction: 'bind_phone' → 前端显示绑定手机号页面
+ - 用户输入手机号和验证码 → 调用绑定手机号API → 绑定成功后自动登录并跳转/console
+ - 用户已有手机号 → 后端返回nextAction: 'redirect' → 前端自动跳转到/console
+
### 后端修改完成
1. **QrLoginStatusResponse新增字段**:
- nextAction: 下一步操作指示
@@ -39,3 +64,39 @@
### 目的
确保用户数据同步到 websopy 时,手机号字段已有值,避免无效的缓存数据。
+
+## MQ消息队列实现 (02:40)
+
+### 创建的文件
+1. **SyncMessage.java** - 统一消息实体
+ - 位置: `com.gxwebsoft.common.mq.message`
+ - 支持 USER_SYNC, TENANT_SYNC 等消息类型
+ - 支持 CREATE, UPDATE, DELETE 事件类型
+
+2. **SyncMessageProducer.java** - 消息生产者接口
+ - 位置: `com.gxwebsoft.common.mq.producer`
+ - 预留抽象层,便于将来切换到RocketMQ
+
+3. **RabbitMQConfig.java** - RabbitMQ配置类
+ - 位置: `com.gxwebsoft.common.mq.config`
+ - 定义交换机、队列、死信队列
+ - 配置JSON序列化
+
+4. **RabbitMQSyncProducer.java** - RabbitMQ生产者实现
+ - 位置: `com.gxwebsoft.common.mq.producer.impl`
+ - 实现确认回调和Return回调
+
+5. **UserSyncConsumer.java** - 用户同步消费者
+ - 位置: `com.gxwebsoft.common.mq.consumer`
+ - 监听用户同步消息,调用UserSyncService同步到websopy
+ - 支持重试机制和死信队列
+
+### 修改的文件
+1. **pom.xml** - 添加 spring-boot-starter-amqp 依赖
+2. **UserSyncService.java** - 添加Map参数的重载方法
+3. **QrLoginServiceImpl.java** - 改用MQ发送消息
+4. **application.yml** - 添加RabbitMQ配置
+
+### 使用方式
+配置 `sync.mq.enabled: false` 可临时禁用MQ,回退到原有直接同步方式。
+
diff --git a/pom.xml b/pom.xml
index ab44633..7c0dcb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -316,6 +316,12 @@
1.0.0.14
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
com.github.xiaoymin
diff --git a/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java b/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java
index 15408d8..155689b 100644
--- a/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java
+++ b/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java
@@ -16,11 +16,11 @@ import com.gxwebsoft.common.core.security.JwtSubject;
import com.gxwebsoft.common.core.security.JwtUtil;
import com.gxwebsoft.common.core.utils.CommonUtil;
import com.gxwebsoft.common.core.utils.RedisUtil;
+import com.gxwebsoft.common.mq.producer.SyncMessageProducer;
import com.gxwebsoft.common.system.entity.User;
import com.gxwebsoft.common.system.entity.UserOauth;
import com.gxwebsoft.common.system.service.UserOauthService;
import com.gxwebsoft.common.system.service.UserService;
-import com.gxwebsoft.common.system.service.UserSyncService;
import com.gxwebsoft.common.system.service.WxService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -61,7 +61,7 @@ public class QrLoginServiceImpl implements QrLoginService {
private UserOauthService userOauthService;
@Autowired(required = false)
- private UserSyncService userSyncService;
+ private SyncMessageProducer syncMessageProducer;
@Override
public QrLoginGenerateResponse generateQrLoginToken(Integer tenantId) {
@@ -295,12 +295,12 @@ public class QrLoginServiceImpl implements QrLoginService {
userService.updateUser(user);
redisUtil.delete(codeKey);
- // 绑定手机号成功后,同步用户数据到 websopy
- if (userSyncService != null) {
+ // 绑定手机号成功后,通过MQ异步同步用户数据到 websopy
+ if (syncMessageProducer != null) {
User updatedUser = userService.getAllByUserId(String.valueOf(user.getUserId()));
if (updatedUser != null) {
- userSyncService.syncUserToWebsopy(updatedUser);
- log.info("扫码绑定手机号后同步用户到websopy成功: userId={}, phone={}", user.getUserId(), user.getPhone());
+ syncMessageProducer.sendUserSyncMessage("websopy", "UPDATE", updatedUser);
+ log.info("扫码绑定手机号后发送MQ消息同步用户到websopy: userId={}, phone={}", user.getUserId(), user.getPhone());
}
}
diff --git a/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java b/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java
new file mode 100644
index 0000000..ff2f21d
--- /dev/null
+++ b/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java
@@ -0,0 +1,151 @@
+package com.gxwebsoft.common.mq.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * RabbitMQ 配置类
+ */
+@Configuration
+@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
+public class RabbitMQConfig {
+
+ // ==================== 常量定义 ====================
+ public static final String SYNC_EXCHANGE = "sync.exchange";
+ public static final String SYNC_QUEUE = "sync.queue";
+ public static final String SYNC_ROUTING_KEY = "sync.message";
+
+ // 死信队列
+ public static final String DLX_EXCHANGE = "sync.dlx.exchange";
+ public static final String DLQ_QUEUE = "sync.dlq";
+ public static final String DLQ_ROUTING_KEY = "sync.dlq";
+
+ @Value("${spring.rabbitmq.host:localhost}")
+ private String host;
+
+ @Value("${spring.rabbitmq.port:5672}")
+ private int port;
+
+ @Value("${spring.rabbitmq.username:guest}")
+ private String username;
+
+ @Value("${spring.rabbitmq.password:guest}")
+ private String password;
+
+ @Value("${spring.rabbitmq.virtual-host:/}")
+ private String virtualHost;
+
+ // ==================== Connection Factory ====================
+
+ @Bean
+ public ConnectionFactory connectionFactory() {
+ CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
+ connectionFactory.setHost(host);
+ connectionFactory.setPort(port);
+ connectionFactory.setUsername(username);
+ connectionFactory.setPassword(password);
+ connectionFactory.setVirtualHost(virtualHost);
+ // 开启publisher-confirm确认模式
+ connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
+ // 开启publisher-return确认模式
+ connectionFactory.setPublisherReturns(true);
+ return connectionFactory;
+ }
+
+ // ==================== Message Converter ====================
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JavaTimeModule());
+ objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ return objectMapper;
+ }
+
+ @Bean
+ public MessageConverter messageConverter(ObjectMapper objectMapper) {
+ return new Jackson2JsonMessageConverter(objectMapper);
+ }
+
+ // ==================== RabbitTemplate ====================
+
+ @Bean
+ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+ rabbitTemplate.setMessageConverter(messageConverter);
+ // 设置Mandatory为true,才能触发ReturnCallback
+ rabbitTemplate.setMandatory(true);
+ return rabbitTemplate;
+ }
+
+ @Bean
+ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
+ ConnectionFactory connectionFactory, MessageConverter messageConverter) {
+ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+ factory.setConnectionFactory(connectionFactory);
+ factory.setMessageConverter(messageConverter);
+ // 设置并发数
+ factory.setConcurrent(1);
+ factory.setMaxConcurrent(5);
+ // 设置手动ack
+ factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+ // 预取数量
+ factory.setPrefetchCount(10);
+ return factory;
+ }
+
+ // ==================== 交换机 ====================
+
+ @Bean
+ public DirectExchange syncExchange() {
+ return new DirectExchange(SYNC_EXCHANGE, true, false);
+ }
+
+ @Bean
+ public DirectExchange deadLetterExchange() {
+ return new DirectExchange(DLX_EXCHANGE, true, false);
+ }
+
+ // ==================== 队列 ====================
+
+ @Bean
+ public Queue syncQueue() {
+ return QueueBuilder.durable(SYNC_QUEUE)
+ .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
+ .withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY)
+ .build();
+ }
+
+ @Bean
+ public Queue deadLetterQueue() {
+ return QueueBuilder.durable(DLQ_QUEUE).build();
+ }
+
+ // ==================== 绑定 ====================
+
+ @Bean
+ public Binding syncBinding() {
+ return BindingBuilder.bind(syncQueue())
+ .to(syncExchange())
+ .with(SYNC_ROUTING_KEY);
+ }
+
+ @Bean
+ public Binding dlqBinding() {
+ return BindingBuilder.bind(deadLetterQueue())
+ .to(deadLetterExchange())
+ .with(DLQ_ROUTING_KEY);
+ }
+}
diff --git a/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java b/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java
new file mode 100644
index 0000000..3fe77fe
--- /dev/null
+++ b/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java
@@ -0,0 +1,159 @@
+package com.gxwebsoft.common.mq.consumer;
+
+import com.gxwebsoft.common.mq.config.RabbitMQConfig;
+import com.gxwebsoft.common.mq.message.SyncMessage;
+import com.gxwebsoft.common.system.service.UserSyncService;
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * 用户同步消息消费者
+ *
+ * 负责监听用户同步消息,并将数据同步到目标系统(如websopy)
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
+public class UserSyncConsumer {
+
+ @Autowired(required = false)
+ private UserSyncService userSyncService;
+
+ private static final int MAX_RETRY_COUNT = 3;
+
+ /**
+ * 监听用户同步消息
+ */
+ @RabbitListener(queues = RabbitMQConfig.SYNC_QUEUE)
+ public void handleMessage(SyncMessage message, Channel channel, Message amqpMessage) {
+ long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
+
+ try {
+ log.info("收到MQ消息: messageId={}, type={}, event={}, target={}",
+ message.getMessageId(), message.getMessageType(),
+ message.getEventType(), message.getTargetSystem());
+
+ // 检查是否启用了同步服务
+ if (userSyncService == null) {
+ log.warn("UserSyncService 未启用,跳过消息处理: messageId={}", message.getMessageId());
+ channel.basicAck(deliveryTag, false);
+ return;
+ }
+
+ // 处理消息
+ processMessage(message);
+
+ // 确认消息
+ channel.basicAck(deliveryTag, false);
+ log.info("消息处理成功: messageId={}", message.getMessageId());
+
+ } catch (Exception e) {
+ log.error("消息处理失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
+ handleFailure(channel, amqpMessage, deliveryTag, message, e);
+ }
+ }
+
+ /**
+ * 处理消息
+ */
+ private void processMessage(SyncMessage message) {
+ String messageType = message.getMessageType();
+ String eventType = message.getEventType();
+ String targetSystem = message.getTargetSystem();
+ Map data = message.getData();
+
+ // 判断消息类型并处理
+ if ("USER_SYNC".equals(messageType)) {
+ handleUserSync(targetSystem, eventType, data);
+ } else {
+ log.warn("未知的消息类型: messageType={}", messageType);
+ }
+ }
+
+ /**
+ * 处理用户同步
+ */
+ private void handleUserSync(String targetSystem, String eventType, Map data) {
+ switch (eventType) {
+ case "CREATE":
+ case "UPDATE":
+ userSyncService.syncUserToWebsopy(data);
+ log.info("用户同步到{}成功: event={}", targetSystem, eventType);
+ break;
+ case "DELETE":
+ userSyncService.deleteUserFromWebsopy(data);
+ log.info("用户从{}删除成功: event={}", targetSystem, eventType);
+ break;
+ default:
+ log.warn("未知的用户事件类型: eventType={}", eventType);
+ }
+ }
+
+ /**
+ * 处理失败消息
+ */
+ private void handleFailure(Channel channel, Message amqpMessage, long deliveryTag,
+ SyncMessage message, Exception e) {
+ Integer retryCount = message.getRetryCount();
+ if (retryCount == null) {
+ retryCount = 0;
+ }
+
+ if (retryCount < MAX_RETRY_COUNT) {
+ // 重试:拒绝消息并重新入队
+ try {
+ log.warn("消息处理失败,准备重试: messageId={}, retryCount={}/{}",
+ message.getMessageId(), retryCount + 1, MAX_RETRY_COUNT);
+ // 增加重试次数
+ message.setRetryCount(retryCount + 1);
+ // 拒绝消息(requeue=true 会重新入队)
+ channel.basicNack(deliveryTag, false, true);
+ } catch (IOException ioException) {
+ log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException);
+ }
+ } else {
+ // 超过重试次数,发送到死信队列
+ try {
+ log.error("消息处理失败次数超限,发送到死信队列: messageId={}, retryCount={}",
+ message.getMessageId(), retryCount);
+ // 拒绝消息(requeue=false 进入死信队列)
+ channel.basicNack(deliveryTag, false, false);
+ } catch (IOException ioException) {
+ log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException);
+ }
+ }
+ }
+
+ /**
+ * 监听死信队列消息(可选,用于告警或人工处理)
+ */
+ @RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE)
+ public void handleDeadLetter(SyncMessage message, Channel channel, Message amqpMessage) {
+ long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
+ try {
+ log.error("死信消息: messageId={}, type={}, event={}, target={}, retryCount={}",
+ message.getMessageId(), message.getMessageType(),
+ message.getEventType(), message.getTargetSystem(),
+ message.getRetryCount());
+
+ // TODO: 可以在这里添加告警逻辑,如发送邮件、钉钉通知等
+
+ channel.basicAck(deliveryTag, false);
+ } catch (Exception e) {
+ log.error("处理死信消息失败: messageId={}", message.getMessageId(), e);
+ try {
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException ioException) {
+ log.error("确认死信消息失败", ioException);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java b/src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java
new file mode 100644
index 0000000..e141025
--- /dev/null
+++ b/src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java
@@ -0,0 +1,80 @@
+package com.gxwebsoft.common.mq.message;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.Map;
+
+/**
+ * 统一消息实体 - 用于各模块间的数据同步
+ */
+@Data
+public class SyncMessage implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 消息唯一ID
+ */
+ private String messageId;
+
+ /**
+ * 消息类型:USER_SYNC, TENANT_SYNC, etc.
+ */
+ private String messageType;
+
+ /**
+ * 事件类型:CREATE, UPDATE, DELETE
+ */
+ private String eventType;
+
+ /**
+ * 目标系统标识
+ */
+ private String targetSystem;
+
+ /**
+ * 业务数据(Map格式,便于序列化)
+ */
+ private Map data;
+
+ /**
+ * 创建时间
+ */
+ private LocalDateTime createTime;
+
+ /**
+ * 消息重试次数
+ */
+ private Integer retryCount;
+
+ public SyncMessage() {
+ this.createTime = LocalDateTime.now();
+ this.retryCount = 0;
+ }
+
+ public SyncMessage(String messageType, String eventType, String targetSystem, Map data) {
+ this();
+ this.messageId = java.util.UUID.randomUUID().toString().replace("-", "");
+ this.messageType = messageType;
+ this.eventType = eventType;
+ this.targetSystem = targetSystem;
+ this.data = data;
+ }
+
+ /**
+ * 创建用户同步消息
+ */
+ public static SyncMessage userCreate(String targetSystem, Map userData) {
+ return new SyncMessage("USER_SYNC", "CREATE", targetSystem, userData);
+ }
+
+ public static SyncMessage userUpdate(String targetSystem, Map userData) {
+ return new SyncMessage("USER_SYNC", "UPDATE", targetSystem, userData);
+ }
+
+ public static SyncMessage userDelete(String targetSystem, Map userData) {
+ return new SyncMessage("USER_SYNC", "DELETE", targetSystem, userData);
+ }
+}
diff --git a/src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java b/src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java
new file mode 100644
index 0000000..49db5b2
--- /dev/null
+++ b/src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java
@@ -0,0 +1,41 @@
+package com.gxwebsoft.common.mq.producer;
+
+import com.gxwebsoft.common.mq.message.SyncMessage;
+
+/**
+ * 消息生产者接口 - 预留抽象层,便于将来切换MQ实现(如从RabbitMQ迁移到RocketMQ)
+ */
+public interface SyncMessageProducer {
+
+ /**
+ * 发送同步消息
+ *
+ * @param message 消息体
+ */
+ void sendSyncMessage(SyncMessage message);
+
+ /**
+ * 发送同步消息(带回调)
+ *
+ * @param message 消息体
+ * @param callback 发送回调
+ */
+ void sendSyncMessage(SyncMessage message, SendCallback callback);
+
+ /**
+ * 发送用户同步消息
+ *
+ * @param targetSystem 目标系统
+ * @param eventType 事件类型:CREATE, UPDATE, DELETE
+ * @param userData 用户数据
+ */
+ void sendUserSyncMessage(String targetSystem, String eventType, Object userData);
+
+ /**
+ * 发送回调接口
+ */
+ interface SendCallback {
+ void onSuccess(String messageId);
+ void onFailure(String messageId, Throwable throwable);
+ }
+}
diff --git a/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java b/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java
new file mode 100644
index 0000000..0bf2288
--- /dev/null
+++ b/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java
@@ -0,0 +1,126 @@
+package com.gxwebsoft.common.mq.producer.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.gxwebsoft.common.mq.config.RabbitMQConfig;
+import com.gxwebsoft.common.mq.message.SyncMessage;
+import com.gxwebsoft.common.mq.producer.SyncMessageProducer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Autowired;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * RabbitMQ 消息生产者实现
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
+public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
+
+ private final RabbitTemplate rabbitTemplate;
+ private final MessageConverter messageConverter;
+ private final ObjectMapper objectMapper;
+
+ @Autowired
+ public RabbitMQSyncProducer(RabbitTemplate rabbitTemplate, MessageConverter messageConverter, ObjectMapper objectMapper) {
+ this.rabbitTemplate = rabbitTemplate;
+ this.messageConverter = messageConverter;
+ this.objectMapper = objectMapper;
+ // 设置确认回调
+ this.rabbitTemplate.setConfirmCallback(this);
+ this.rabbitTemplate.setReturnCallback(this);
+ }
+
+ @Override
+ public void sendSyncMessage(SyncMessage message) {
+ sendSyncMessage(message, null);
+ }
+
+ @Override
+ public void sendSyncMessage(SyncMessage message, SendCallback callback) {
+ try {
+ log.info("发送MQ消息: messageId={}, type={}, event={}, target={}",
+ message.getMessageId(), message.getMessageType(),
+ message.getEventType(), message.getTargetSystem());
+
+ CorrelationData correlationData = new CorrelationData(message.getMessageId());
+
+ if (callback != null) {
+ correlationData.getFuture().addCallback(
+ result -> {
+ if (result.isAck()) {
+ callback.onSuccess(message.getMessageId());
+ } else {
+ callback.onFailure(message.getMessageId(),
+ new RuntimeException("消息发送未被确认"));
+ }
+ },
+ ex -> callback.onFailure(message.getMessageId(), ex)
+ );
+ }
+
+ rabbitTemplate.convertAndSend(
+ RabbitMQConfig.SYNC_EXCHANGE,
+ RabbitMQConfig.SYNC_ROUTING_KEY,
+ message,
+ correlationData
+ );
+
+ } catch (Exception e) {
+ log.error("发送MQ消息失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
+ if (callback != null) {
+ callback.onFailure(message.getMessageId(), e);
+ }
+ }
+ }
+
+ @Override
+ public void sendUserSyncMessage(String targetSystem, String eventType, Object userData) {
+ try {
+ Map dataMap;
+ if (userData instanceof Map) {
+ dataMap = (Map) userData;
+ } else {
+ // 转换为Map
+ dataMap = objectMapper.convertValue(userData, Map.class);
+ }
+
+ SyncMessage message = new SyncMessage("USER_SYNC", eventType, targetSystem, dataMap);
+ sendSyncMessage(message);
+ } catch (Exception e) {
+ log.error("发送用户同步消息失败: targetSystem={}, eventType={}, error={}",
+ targetSystem, eventType, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 确认回调 - Broker确认收到消息
+ */
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ String messageId = correlationData.getId();
+ if (ack) {
+ log.debug("消息确认成功: messageId={}", messageId);
+ } else {
+ log.warn("消息确认失败: messageId={}, cause={}", messageId, cause);
+ }
+ }
+
+ /**
+ * Return回调 - 消息无法路由时回调
+ */
+ @Override
+ public void returnedMessage(Message message, int replyCode, String replyText,
+ String exchange, String routingKey) {
+ log.error("消息无法路由: exchange={}, routingKey={}, replyCode={}, replyText={}",
+ exchange, routingKey, replyCode, replyText);
+ }
+}
diff --git a/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java b/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java
index d6cdbca..984bfa8 100644
--- a/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java
+++ b/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java
@@ -120,6 +120,145 @@ public class UserSyncService {
}
}
+ /**
+ * 同步用户到 websopy(Map格式,用于MQ消息处理)
+ *
+ * @param data 用户信息Map
+ */
+ public void syncUserToWebsopy(Map data) {
+ String websopyBaseUrl = getWebsopyBaseUrl();
+ if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
+ log.warn("websopyUrl 未配置,跳过用户同步");
+ return;
+ }
+
+ if (data == null || data.isEmpty()) {
+ log.warn("用户数据为空,跳过同步");
+ return;
+ }
+
+ try {
+ Integer userId = getIntValue(data, "userId");
+ if (userId == null) {
+ log.warn("用户数据中缺少userId,跳过同步");
+ return;
+ }
+
+ Integer tenantIdValue = getIntValue(data, "tenantId");
+ if (tenantIdValue == null) {
+ tenantIdValue = 0;
+ }
+
+ // 构建JSON字符串
+ StringBuilder jsonBuilder = new StringBuilder();
+ jsonBuilder.append("{");
+ jsonBuilder.append("\"userId\":").append(userId).append(",");
+ jsonBuilder.append("\"username\":\"").append(escapeJson(getStringValue(data, "username"))).append("\",");
+ jsonBuilder.append("\"nickname\":\"").append(escapeJson(getStringValue(data, "nickname"))).append("\",");
+ jsonBuilder.append("\"avatar\":\"").append(escapeJson(getStringValue(data, "avatar"))).append("\",");
+ jsonBuilder.append("\"phone\":\"").append(escapeJson(getStringValue(data, "phone"))).append("\",");
+ jsonBuilder.append("\"status\":").append(getIntValue(data, "status", 1)).append(",");
+ jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\",");
+ jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(",");
+ jsonBuilder.append("\"tenant_id\":").append(tenantIdValue);
+ jsonBuilder.append("}");
+
+ String url = websopyBaseUrl + "/api/app/user-sync/single";
+ String body = jsonBuilder.toString();
+
+ log.info("MQ同步用户到 websopy: userId={}, body={}", userId, body);
+
+ Map headers = new HashMap<>();
+ headers.put("Content-Type", "application/json");
+
+ HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body);
+ String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+
+ JSONObject result = JSON.parseObject(responseBody);
+ if (result != null && result.getIntValue("code") == 0) {
+ log.info("MQ用户同步成功: userId={}", userId);
+ } else {
+ String message = result != null ? result.getString("message") : "未知错误";
+ log.error("MQ用户同步失败: userId={}, message={}", userId, message);
+ }
+ } catch (Exception e) {
+ log.error("MQ用户同步异常: error={}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 从 websopy 删除用户
+ *
+ * @param data 包含userId的数据
+ */
+ public void deleteUserFromWebsopy(Map data) {
+ String websopyBaseUrl = getWebsopyBaseUrl();
+ if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
+ log.warn("websopyUrl 未配置,跳过删除用户");
+ return;
+ }
+
+ if (data == null) {
+ log.warn("删除用户数据为空");
+ return;
+ }
+
+ try {
+ Integer userId = getIntValue(data, "userId");
+ if (userId == null) {
+ log.warn("删除用户数据中缺少userId");
+ return;
+ }
+
+ String url = websopyBaseUrl + "/api/app/user-sync/delete/" + userId;
+
+ log.info("从 websopy 删除用户: userId={}", userId);
+
+ Map headers = new HashMap<>();
+ headers.put("Content-Type", "application/json");
+
+ HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, "");
+ String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
+
+ JSONObject result = JSON.parseObject(responseBody);
+ if (result != null && result.getIntValue("code") == 0) {
+ log.info("MQ删除用户成功: userId={}", userId);
+ } else {
+ String message = result != null ? result.getString("message") : "未知错误";
+ log.error("MQ删除用户失败: userId={}, message={}", userId, message);
+ }
+ } catch (Exception e) {
+ log.error("MQ删除用户异常: error={}", e.getMessage(), e);
+ }
+ }
+
+ private String getStringValue(Map data, String key) {
+ Object value = data.get(key);
+ return value != null ? String.valueOf(value) : "";
+ }
+
+ private Integer getIntValue(Map data, String key) {
+ return getIntValue(data, key, null);
+ }
+
+ private Integer getIntValue(Map data, String key, Integer defaultValue) {
+ Object value = data.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ try {
+ return Integer.parseInt(String.valueOf(value));
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
/**
* 刷新 websopy 端的用户缓存
* 只传 userId,websopy 端会通过 API 回查获取完整信息
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index b2a34a9..569d7ab 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -58,6 +58,18 @@ spring:
port: 6379
password: redis_WSDb88
+ # RabbitMQ 配置
+ rabbitmq:
+ host: 1Panel-rabbitmq-kvHZ
+ port: 5672
+ username: rabbitmq
+ password: rabbitmq
+ virtual-host: /
+ # 开启确认模式
+ publisher-confirm-type: correlated
+ # 开启Return模式
+ publisher-returns: true
+
# 邮件服务器配置
mail:
host: smtp.qq.com
@@ -111,6 +123,12 @@ config:
bucketDomain: https://oss.wsdns.cn
aliyunDomain: https://oss-gxwebsoft.oss-cn-shenzhen.aliyuncs.com
+# MQ同步配置
+sync:
+ # 是否启用MQ(设为false则使用原有直接同步方式)
+ mq:
+ enabled: true
+
# JWT配置
jwt:
secret: websoft-jwt-secret-key-2025-dev-environment