From ca3651165c3e67777488e82a8e96eddc252dfee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E5=BF=A0=E6=9E=97?= <170083662@qq.com> Date: Tue, 7 Apr 2026 02:44:02 +0800 Subject: [PATCH] =?UTF-8?q?feat(sync):=20=E5=AE=9E=E7=8E=B0MQ=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=98=9F=E5=88=97=E7=94=A8=E6=88=B7=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加RabbitMQ相关配置,支持消息确认与回退机制 - 新增统一消息实体SyncMessage,支持多种同步事件类型 - 实现RabbitMQ消息生产者RabbitMQSyncProducer,包含回调处理和用户同步消息发送 - 实现用户同步消息消费者UserSyncConsumer,支持消息重试和死信队列处理 - UserSyncService新增Map参数方法,适配MQ消息同步调用 - QrLoginServiceImpl改用MQ消息异步同步用户数据,替代原直接调用方式 - 应用配置新增RabbitMQ连接配置及MQ开关开关控制 - 前端接口和组件调整,支持扫码登录绑定手机号及跳转逻辑完善 --- .workbuddy/memory/2026-04-07.md | 61 +++++++ pom.xml | 6 + .../auto/service/impl/QrLoginServiceImpl.java | 12 +- .../common/mq/config/RabbitMQConfig.java | 151 +++++++++++++++++ .../common/mq/consumer/UserSyncConsumer.java | 159 ++++++++++++++++++ .../common/mq/message/SyncMessage.java | 80 +++++++++ .../mq/producer/SyncMessageProducer.java | 41 +++++ .../producer/impl/RabbitMQSyncProducer.java | 126 ++++++++++++++ .../system/service/UserSyncService.java | 139 +++++++++++++++ src/main/resources/application.yml | 18 ++ 10 files changed, 787 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java create mode 100644 src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java create mode 100644 src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java create mode 100644 src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java create mode 100644 src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java diff --git a/.workbuddy/memory/2026-04-07.md b/.workbuddy/memory/2026-04-07.md index 38e8a5a..8ae4245 100644 --- a/.workbuddy/memory/2026-04-07.md +++ b/.workbuddy/memory/2026-04-07.md @@ -17,6 +17,31 @@ 3. **页面跳转逻辑**:登录成功/绑定成功后跳转到/console 4. **API调用更新**:适应新的响应字段格式 +### 前端代码已完成的修改 +1. **API接口修改**: + - 更新QRLoginStatusResult接口,新增nextAction、redirectUrl、successMessage、needBindPhone等字段 + - 修改checkQRLoginStatus函数,使用真实后端API调用 + - 新增bindPhoneForQrLogin函数,处理绑定手机号API调用 + - 修改generateQRLoginToken函数,使用真实后端API调用 + +2. **状态管理修改**: + - 更新auth store接口,添加bindPhoneForQRLogin方法 + - 修改checkQRStatus方法,支持新的返回字段处理 + - 添加绑定手机号处理逻辑 + +3. **组件修改**: + - 创建BindPhone组件:用于处理需要绑定手机号的场景 + - 修改QRLogin组件:支持根据nextAction显示不同UI,自动处理绑定手机号流程 + - 组件现在可以正确处理: + - nextAction: 'bind_phone' → 显示绑定手机号表单 + - nextAction: 'redirect' → 自动跳转到/console + - needBindPhone: true → 显示绑定提示 + +4. **完整流程支持**: + - 用户扫码登录 → 后端返回nextAction: 'bind_phone' → 前端显示绑定手机号页面 + - 用户输入手机号和验证码 → 调用绑定手机号API → 绑定成功后自动登录并跳转/console + - 用户已有手机号 → 后端返回nextAction: 'redirect' → 前端自动跳转到/console + ### 后端修改完成 1. **QrLoginStatusResponse新增字段**: - nextAction: 下一步操作指示 @@ -39,3 +64,39 @@ ### 目的 确保用户数据同步到 websopy 时,手机号字段已有值,避免无效的缓存数据。 + +## MQ消息队列实现 (02:40) + +### 创建的文件 +1. **SyncMessage.java** - 统一消息实体 + - 位置: `com.gxwebsoft.common.mq.message` + - 支持 USER_SYNC, TENANT_SYNC 等消息类型 + - 支持 CREATE, UPDATE, DELETE 事件类型 + +2. **SyncMessageProducer.java** - 消息生产者接口 + - 位置: `com.gxwebsoft.common.mq.producer` + - 预留抽象层,便于将来切换到RocketMQ + +3. **RabbitMQConfig.java** - RabbitMQ配置类 + - 位置: `com.gxwebsoft.common.mq.config` + - 定义交换机、队列、死信队列 + - 配置JSON序列化 + +4. **RabbitMQSyncProducer.java** - RabbitMQ生产者实现 + - 位置: `com.gxwebsoft.common.mq.producer.impl` + - 实现确认回调和Return回调 + +5. **UserSyncConsumer.java** - 用户同步消费者 + - 位置: `com.gxwebsoft.common.mq.consumer` + - 监听用户同步消息,调用UserSyncService同步到websopy + - 支持重试机制和死信队列 + +### 修改的文件 +1. **pom.xml** - 添加 spring-boot-starter-amqp 依赖 +2. **UserSyncService.java** - 添加Map参数的重载方法 +3. **QrLoginServiceImpl.java** - 改用MQ发送消息 +4. **application.yml** - 添加RabbitMQ配置 + +### 使用方式 +配置 `sync.mq.enabled: false` 可临时禁用MQ,回退到原有直接同步方式。 + diff --git a/pom.xml b/pom.xml index ab44633..7c0dcb7 100644 --- a/pom.xml +++ b/pom.xml @@ -316,6 +316,12 @@ 1.0.0.14 + + + org.springframework.boot + spring-boot-starter-amqp + + com.github.xiaoymin diff --git a/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java b/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java index 15408d8..155689b 100644 --- a/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java +++ b/src/main/java/com/gxwebsoft/auto/service/impl/QrLoginServiceImpl.java @@ -16,11 +16,11 @@ import com.gxwebsoft.common.core.security.JwtSubject; import com.gxwebsoft.common.core.security.JwtUtil; import com.gxwebsoft.common.core.utils.CommonUtil; import com.gxwebsoft.common.core.utils.RedisUtil; +import com.gxwebsoft.common.mq.producer.SyncMessageProducer; import com.gxwebsoft.common.system.entity.User; import com.gxwebsoft.common.system.entity.UserOauth; import com.gxwebsoft.common.system.service.UserOauthService; import com.gxwebsoft.common.system.service.UserService; -import com.gxwebsoft.common.system.service.UserSyncService; import com.gxwebsoft.common.system.service.WxService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -61,7 +61,7 @@ public class QrLoginServiceImpl implements QrLoginService { private UserOauthService userOauthService; @Autowired(required = false) - private UserSyncService userSyncService; + private SyncMessageProducer syncMessageProducer; @Override public QrLoginGenerateResponse generateQrLoginToken(Integer tenantId) { @@ -295,12 +295,12 @@ public class QrLoginServiceImpl implements QrLoginService { userService.updateUser(user); redisUtil.delete(codeKey); - // 绑定手机号成功后,同步用户数据到 websopy - if (userSyncService != null) { + // 绑定手机号成功后,通过MQ异步同步用户数据到 websopy + if (syncMessageProducer != null) { User updatedUser = userService.getAllByUserId(String.valueOf(user.getUserId())); if (updatedUser != null) { - userSyncService.syncUserToWebsopy(updatedUser); - log.info("扫码绑定手机号后同步用户到websopy成功: userId={}, phone={}", user.getUserId(), user.getPhone()); + syncMessageProducer.sendUserSyncMessage("websopy", "UPDATE", updatedUser); + log.info("扫码绑定手机号后发送MQ消息同步用户到websopy: userId={}, phone={}", user.getUserId(), user.getPhone()); } } diff --git a/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java b/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java new file mode 100644 index 0000000..ff2f21d --- /dev/null +++ b/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java @@ -0,0 +1,151 @@ +package com.gxwebsoft.common.mq.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * RabbitMQ 配置类 + */ +@Configuration +@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true) +public class RabbitMQConfig { + + // ==================== 常量定义 ==================== + public static final String SYNC_EXCHANGE = "sync.exchange"; + public static final String SYNC_QUEUE = "sync.queue"; + public static final String SYNC_ROUTING_KEY = "sync.message"; + + // 死信队列 + public static final String DLX_EXCHANGE = "sync.dlx.exchange"; + public static final String DLQ_QUEUE = "sync.dlq"; + public static final String DLQ_ROUTING_KEY = "sync.dlq"; + + @Value("${spring.rabbitmq.host:localhost}") + private String host; + + @Value("${spring.rabbitmq.port:5672}") + private int port; + + @Value("${spring.rabbitmq.username:guest}") + private String username; + + @Value("${spring.rabbitmq.password:guest}") + private String password; + + @Value("${spring.rabbitmq.virtual-host:/}") + private String virtualHost; + + // ==================== Connection Factory ==================== + + @Bean + public ConnectionFactory connectionFactory() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setHost(host); + connectionFactory.setPort(port); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualHost); + // 开启publisher-confirm确认模式 + connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); + // 开启publisher-return确认模式 + connectionFactory.setPublisherReturns(true); + return connectionFactory; + } + + // ==================== Message Converter ==================== + + @Bean + public ObjectMapper objectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + return objectMapper; + } + + @Bean + public MessageConverter messageConverter(ObjectMapper objectMapper) { + return new Jackson2JsonMessageConverter(objectMapper); + } + + // ==================== RabbitTemplate ==================== + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMessageConverter(messageConverter); + // 设置Mandatory为true,才能触发ReturnCallback + rabbitTemplate.setMandatory(true); + return rabbitTemplate; + } + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + ConnectionFactory connectionFactory, MessageConverter messageConverter) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setMessageConverter(messageConverter); + // 设置并发数 + factory.setConcurrent(1); + factory.setMaxConcurrent(5); + // 设置手动ack + factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + // 预取数量 + factory.setPrefetchCount(10); + return factory; + } + + // ==================== 交换机 ==================== + + @Bean + public DirectExchange syncExchange() { + return new DirectExchange(SYNC_EXCHANGE, true, false); + } + + @Bean + public DirectExchange deadLetterExchange() { + return new DirectExchange(DLX_EXCHANGE, true, false); + } + + // ==================== 队列 ==================== + + @Bean + public Queue syncQueue() { + return QueueBuilder.durable(SYNC_QUEUE) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) + .withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY) + .build(); + } + + @Bean + public Queue deadLetterQueue() { + return QueueBuilder.durable(DLQ_QUEUE).build(); + } + + // ==================== 绑定 ==================== + + @Bean + public Binding syncBinding() { + return BindingBuilder.bind(syncQueue()) + .to(syncExchange()) + .with(SYNC_ROUTING_KEY); + } + + @Bean + public Binding dlqBinding() { + return BindingBuilder.bind(deadLetterQueue()) + .to(deadLetterExchange()) + .with(DLQ_ROUTING_KEY); + } +} diff --git a/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java b/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java new file mode 100644 index 0000000..3fe77fe --- /dev/null +++ b/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java @@ -0,0 +1,159 @@ +package com.gxwebsoft.common.mq.consumer; + +import com.gxwebsoft.common.mq.config.RabbitMQConfig; +import com.gxwebsoft.common.mq.message.SyncMessage; +import com.gxwebsoft.common.system.service.UserSyncService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Map; + +/** + * 用户同步消息消费者 + * + * 负责监听用户同步消息,并将数据同步到目标系统(如websopy) + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true) +public class UserSyncConsumer { + + @Autowired(required = false) + private UserSyncService userSyncService; + + private static final int MAX_RETRY_COUNT = 3; + + /** + * 监听用户同步消息 + */ + @RabbitListener(queues = RabbitMQConfig.SYNC_QUEUE) + public void handleMessage(SyncMessage message, Channel channel, Message amqpMessage) { + long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); + + try { + log.info("收到MQ消息: messageId={}, type={}, event={}, target={}", + message.getMessageId(), message.getMessageType(), + message.getEventType(), message.getTargetSystem()); + + // 检查是否启用了同步服务 + if (userSyncService == null) { + log.warn("UserSyncService 未启用,跳过消息处理: messageId={}", message.getMessageId()); + channel.basicAck(deliveryTag, false); + return; + } + + // 处理消息 + processMessage(message); + + // 确认消息 + channel.basicAck(deliveryTag, false); + log.info("消息处理成功: messageId={}", message.getMessageId()); + + } catch (Exception e) { + log.error("消息处理失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e); + handleFailure(channel, amqpMessage, deliveryTag, message, e); + } + } + + /** + * 处理消息 + */ + private void processMessage(SyncMessage message) { + String messageType = message.getMessageType(); + String eventType = message.getEventType(); + String targetSystem = message.getTargetSystem(); + Map data = message.getData(); + + // 判断消息类型并处理 + if ("USER_SYNC".equals(messageType)) { + handleUserSync(targetSystem, eventType, data); + } else { + log.warn("未知的消息类型: messageType={}", messageType); + } + } + + /** + * 处理用户同步 + */ + private void handleUserSync(String targetSystem, String eventType, Map data) { + switch (eventType) { + case "CREATE": + case "UPDATE": + userSyncService.syncUserToWebsopy(data); + log.info("用户同步到{}成功: event={}", targetSystem, eventType); + break; + case "DELETE": + userSyncService.deleteUserFromWebsopy(data); + log.info("用户从{}删除成功: event={}", targetSystem, eventType); + break; + default: + log.warn("未知的用户事件类型: eventType={}", eventType); + } + } + + /** + * 处理失败消息 + */ + private void handleFailure(Channel channel, Message amqpMessage, long deliveryTag, + SyncMessage message, Exception e) { + Integer retryCount = message.getRetryCount(); + if (retryCount == null) { + retryCount = 0; + } + + if (retryCount < MAX_RETRY_COUNT) { + // 重试:拒绝消息并重新入队 + try { + log.warn("消息处理失败,准备重试: messageId={}, retryCount={}/{}", + message.getMessageId(), retryCount + 1, MAX_RETRY_COUNT); + // 增加重试次数 + message.setRetryCount(retryCount + 1); + // 拒绝消息(requeue=true 会重新入队) + channel.basicNack(deliveryTag, false, true); + } catch (IOException ioException) { + log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException); + } + } else { + // 超过重试次数,发送到死信队列 + try { + log.error("消息处理失败次数超限,发送到死信队列: messageId={}, retryCount={}", + message.getMessageId(), retryCount); + // 拒绝消息(requeue=false 进入死信队列) + channel.basicNack(deliveryTag, false, false); + } catch (IOException ioException) { + log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException); + } + } + } + + /** + * 监听死信队列消息(可选,用于告警或人工处理) + */ + @RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE) + public void handleDeadLetter(SyncMessage message, Channel channel, Message amqpMessage) { + long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); + try { + log.error("死信消息: messageId={}, type={}, event={}, target={}, retryCount={}", + message.getMessageId(), message.getMessageType(), + message.getEventType(), message.getTargetSystem(), + message.getRetryCount()); + + // TODO: 可以在这里添加告警逻辑,如发送邮件、钉钉通知等 + + channel.basicAck(deliveryTag, false); + } catch (Exception e) { + log.error("处理死信消息失败: messageId={}", message.getMessageId(), e); + try { + channel.basicAck(deliveryTag, false); + } catch (IOException ioException) { + log.error("确认死信消息失败", ioException); + } + } + } +} diff --git a/src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java b/src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java new file mode 100644 index 0000000..e141025 --- /dev/null +++ b/src/main/java/com/gxwebsoft/common/mq/message/SyncMessage.java @@ -0,0 +1,80 @@ +package com.gxwebsoft.common.mq.message; + +import lombok.Data; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Map; + +/** + * 统一消息实体 - 用于各模块间的数据同步 + */ +@Data +public class SyncMessage implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 消息唯一ID + */ + private String messageId; + + /** + * 消息类型:USER_SYNC, TENANT_SYNC, etc. + */ + private String messageType; + + /** + * 事件类型:CREATE, UPDATE, DELETE + */ + private String eventType; + + /** + * 目标系统标识 + */ + private String targetSystem; + + /** + * 业务数据(Map格式,便于序列化) + */ + private Map data; + + /** + * 创建时间 + */ + private LocalDateTime createTime; + + /** + * 消息重试次数 + */ + private Integer retryCount; + + public SyncMessage() { + this.createTime = LocalDateTime.now(); + this.retryCount = 0; + } + + public SyncMessage(String messageType, String eventType, String targetSystem, Map data) { + this(); + this.messageId = java.util.UUID.randomUUID().toString().replace("-", ""); + this.messageType = messageType; + this.eventType = eventType; + this.targetSystem = targetSystem; + this.data = data; + } + + /** + * 创建用户同步消息 + */ + public static SyncMessage userCreate(String targetSystem, Map userData) { + return new SyncMessage("USER_SYNC", "CREATE", targetSystem, userData); + } + + public static SyncMessage userUpdate(String targetSystem, Map userData) { + return new SyncMessage("USER_SYNC", "UPDATE", targetSystem, userData); + } + + public static SyncMessage userDelete(String targetSystem, Map userData) { + return new SyncMessage("USER_SYNC", "DELETE", targetSystem, userData); + } +} diff --git a/src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java b/src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java new file mode 100644 index 0000000..49db5b2 --- /dev/null +++ b/src/main/java/com/gxwebsoft/common/mq/producer/SyncMessageProducer.java @@ -0,0 +1,41 @@ +package com.gxwebsoft.common.mq.producer; + +import com.gxwebsoft.common.mq.message.SyncMessage; + +/** + * 消息生产者接口 - 预留抽象层,便于将来切换MQ实现(如从RabbitMQ迁移到RocketMQ) + */ +public interface SyncMessageProducer { + + /** + * 发送同步消息 + * + * @param message 消息体 + */ + void sendSyncMessage(SyncMessage message); + + /** + * 发送同步消息(带回调) + * + * @param message 消息体 + * @param callback 发送回调 + */ + void sendSyncMessage(SyncMessage message, SendCallback callback); + + /** + * 发送用户同步消息 + * + * @param targetSystem 目标系统 + * @param eventType 事件类型:CREATE, UPDATE, DELETE + * @param userData 用户数据 + */ + void sendUserSyncMessage(String targetSystem, String eventType, Object userData); + + /** + * 发送回调接口 + */ + interface SendCallback { + void onSuccess(String messageId); + void onFailure(String messageId, Throwable throwable); + } +} diff --git a/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java b/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java new file mode 100644 index 0000000..0bf2288 --- /dev/null +++ b/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java @@ -0,0 +1,126 @@ +package com.gxwebsoft.common.mq.producer.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.gxwebsoft.common.mq.config.RabbitMQConfig; +import com.gxwebsoft.common.mq.message.SyncMessage; +import com.gxwebsoft.common.mq.producer.SyncMessageProducer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import javax.annotation.Autowired; +import java.util.HashMap; +import java.util.Map; + +/** + * RabbitMQ 消息生产者实现 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true) +public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { + + private final RabbitTemplate rabbitTemplate; + private final MessageConverter messageConverter; + private final ObjectMapper objectMapper; + + @Autowired + public RabbitMQSyncProducer(RabbitTemplate rabbitTemplate, MessageConverter messageConverter, ObjectMapper objectMapper) { + this.rabbitTemplate = rabbitTemplate; + this.messageConverter = messageConverter; + this.objectMapper = objectMapper; + // 设置确认回调 + this.rabbitTemplate.setConfirmCallback(this); + this.rabbitTemplate.setReturnCallback(this); + } + + @Override + public void sendSyncMessage(SyncMessage message) { + sendSyncMessage(message, null); + } + + @Override + public void sendSyncMessage(SyncMessage message, SendCallback callback) { + try { + log.info("发送MQ消息: messageId={}, type={}, event={}, target={}", + message.getMessageId(), message.getMessageType(), + message.getEventType(), message.getTargetSystem()); + + CorrelationData correlationData = new CorrelationData(message.getMessageId()); + + if (callback != null) { + correlationData.getFuture().addCallback( + result -> { + if (result.isAck()) { + callback.onSuccess(message.getMessageId()); + } else { + callback.onFailure(message.getMessageId(), + new RuntimeException("消息发送未被确认")); + } + }, + ex -> callback.onFailure(message.getMessageId(), ex) + ); + } + + rabbitTemplate.convertAndSend( + RabbitMQConfig.SYNC_EXCHANGE, + RabbitMQConfig.SYNC_ROUTING_KEY, + message, + correlationData + ); + + } catch (Exception e) { + log.error("发送MQ消息失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e); + if (callback != null) { + callback.onFailure(message.getMessageId(), e); + } + } + } + + @Override + public void sendUserSyncMessage(String targetSystem, String eventType, Object userData) { + try { + Map dataMap; + if (userData instanceof Map) { + dataMap = (Map) userData; + } else { + // 转换为Map + dataMap = objectMapper.convertValue(userData, Map.class); + } + + SyncMessage message = new SyncMessage("USER_SYNC", eventType, targetSystem, dataMap); + sendSyncMessage(message); + } catch (Exception e) { + log.error("发送用户同步消息失败: targetSystem={}, eventType={}, error={}", + targetSystem, eventType, e.getMessage(), e); + } + } + + /** + * 确认回调 - Broker确认收到消息 + */ + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + String messageId = correlationData.getId(); + if (ack) { + log.debug("消息确认成功: messageId={}", messageId); + } else { + log.warn("消息确认失败: messageId={}, cause={}", messageId, cause); + } + } + + /** + * Return回调 - 消息无法路由时回调 + */ + @Override + public void returnedMessage(Message message, int replyCode, String replyText, + String exchange, String routingKey) { + log.error("消息无法路由: exchange={}, routingKey={}, replyCode={}, replyText={}", + exchange, routingKey, replyCode, replyText); + } +} diff --git a/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java b/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java index d6cdbca..984bfa8 100644 --- a/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java +++ b/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java @@ -120,6 +120,145 @@ public class UserSyncService { } } + /** + * 同步用户到 websopy(Map格式,用于MQ消息处理) + * + * @param data 用户信息Map + */ + public void syncUserToWebsopy(Map data) { + String websopyBaseUrl = getWebsopyBaseUrl(); + if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) { + log.warn("websopyUrl 未配置,跳过用户同步"); + return; + } + + if (data == null || data.isEmpty()) { + log.warn("用户数据为空,跳过同步"); + return; + } + + try { + Integer userId = getIntValue(data, "userId"); + if (userId == null) { + log.warn("用户数据中缺少userId,跳过同步"); + return; + } + + Integer tenantIdValue = getIntValue(data, "tenantId"); + if (tenantIdValue == null) { + tenantIdValue = 0; + } + + // 构建JSON字符串 + StringBuilder jsonBuilder = new StringBuilder(); + jsonBuilder.append("{"); + jsonBuilder.append("\"userId\":").append(userId).append(","); + jsonBuilder.append("\"username\":\"").append(escapeJson(getStringValue(data, "username"))).append("\","); + jsonBuilder.append("\"nickname\":\"").append(escapeJson(getStringValue(data, "nickname"))).append("\","); + jsonBuilder.append("\"avatar\":\"").append(escapeJson(getStringValue(data, "avatar"))).append("\","); + jsonBuilder.append("\"phone\":\"").append(escapeJson(getStringValue(data, "phone"))).append("\","); + jsonBuilder.append("\"status\":").append(getIntValue(data, "status", 1)).append(","); + jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\","); + jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(","); + jsonBuilder.append("\"tenant_id\":").append(tenantIdValue); + jsonBuilder.append("}"); + + String url = websopyBaseUrl + "/api/app/user-sync/single"; + String body = jsonBuilder.toString(); + + log.info("MQ同步用户到 websopy: userId={}, body={}", userId, body); + + Map headers = new HashMap<>(); + headers.put("Content-Type", "application/json"); + + HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body); + String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + + JSONObject result = JSON.parseObject(responseBody); + if (result != null && result.getIntValue("code") == 0) { + log.info("MQ用户同步成功: userId={}", userId); + } else { + String message = result != null ? result.getString("message") : "未知错误"; + log.error("MQ用户同步失败: userId={}, message={}", userId, message); + } + } catch (Exception e) { + log.error("MQ用户同步异常: error={}", e.getMessage(), e); + } + } + + /** + * 从 websopy 删除用户 + * + * @param data 包含userId的数据 + */ + public void deleteUserFromWebsopy(Map data) { + String websopyBaseUrl = getWebsopyBaseUrl(); + if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) { + log.warn("websopyUrl 未配置,跳过删除用户"); + return; + } + + if (data == null) { + log.warn("删除用户数据为空"); + return; + } + + try { + Integer userId = getIntValue(data, "userId"); + if (userId == null) { + log.warn("删除用户数据中缺少userId"); + return; + } + + String url = websopyBaseUrl + "/api/app/user-sync/delete/" + userId; + + log.info("从 websopy 删除用户: userId={}", userId); + + Map headers = new HashMap<>(); + headers.put("Content-Type", "application/json"); + + HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, ""); + String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + + JSONObject result = JSON.parseObject(responseBody); + if (result != null && result.getIntValue("code") == 0) { + log.info("MQ删除用户成功: userId={}", userId); + } else { + String message = result != null ? result.getString("message") : "未知错误"; + log.error("MQ删除用户失败: userId={}, message={}", userId, message); + } + } catch (Exception e) { + log.error("MQ删除用户异常: error={}", e.getMessage(), e); + } + } + + private String getStringValue(Map data, String key) { + Object value = data.get(key); + return value != null ? String.valueOf(value) : ""; + } + + private Integer getIntValue(Map data, String key) { + return getIntValue(data, key, null); + } + + private Integer getIntValue(Map data, String key, Integer defaultValue) { + Object value = data.get(key); + if (value == null) { + return defaultValue; + } + if (value instanceof Integer) { + return (Integer) value; + } + if (value instanceof Number) { + return ((Number) value).intValue(); + } + try { + return Integer.parseInt(String.valueOf(value)); + } catch (NumberFormatException e) { + return defaultValue; + } + } + /** * 刷新 websopy 端的用户缓存 * 只传 userId,websopy 端会通过 API 回查获取完整信息 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b2a34a9..569d7ab 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -58,6 +58,18 @@ spring: port: 6379 password: redis_WSDb88 + # RabbitMQ 配置 + rabbitmq: + host: 1Panel-rabbitmq-kvHZ + port: 5672 + username: rabbitmq + password: rabbitmq + virtual-host: / + # 开启确认模式 + publisher-confirm-type: correlated + # 开启Return模式 + publisher-returns: true + # 邮件服务器配置 mail: host: smtp.qq.com @@ -111,6 +123,12 @@ config: bucketDomain: https://oss.wsdns.cn aliyunDomain: https://oss-gxwebsoft.oss-cn-shenzhen.aliyuncs.com +# MQ同步配置 +sync: + # 是否启用MQ(设为false则使用原有直接同步方式) + mq: + enabled: true + # JWT配置 jwt: secret: websoft-jwt-secret-key-2025-dev-environment