From 0343979b5d4530c810bd414d210e80946866171d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E5=BF=A0=E6=9E=97?= <170083662@qq.com> Date: Thu, 9 Apr 2026 16:12:05 +0800 Subject: [PATCH] =?UTF-8?q?feat(mq):=20=E9=87=8D=E6=9E=84=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E5=90=8C=E6=AD=A5=E4=B8=BAMQ=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=A9=B1=E5=8A=A8=E5=B9=B6=E7=A7=BB=E9=99=A4HTTP=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将用户同步消息交换机由 DirectExchange 改为 TopicExchange,实现多目标系统路由 - 去除核心系统中用户同步队列定义和消费者,转由各子系统独立消费 - RabbitMQSyncProducer 动态构建 routing key,支持按目标系统路由同步消息 - UserServiceImpl 用户新增和更新后发送同步消息到 websopy 的 MQ 交换机 - 废弃 UserSyncService 中的 HTTP 同步接口,改为通过 MQ 实现用户数据同步 - 删除 UserSyncConsumer,核心系统不再直接消费用户同步消息 - 增加日志输出,便于追踪用户同步消息发送情况 - 保留废弃代码兼容旧引用,方便后续平滑迁移和维护 --- .workbuddy/expert-history.json | 24 +- .workbuddy/memory/2026-04-09.md | 115 +++++++ .../common/mq/config/RabbitMQConfig.java | 38 ++- .../common/mq/consumer/UserSyncConsumer.java | 159 ---------- .../producer/impl/RabbitMQSyncProducer.java | 18 +- .../system/service/UserSyncService.java | 294 +----------------- .../system/service/impl/UserServiceImpl.java | 24 ++ 7 files changed, 217 insertions(+), 455 deletions(-) create mode 100644 .workbuddy/memory/2026-04-09.md delete mode 100644 src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java diff --git a/.workbuddy/expert-history.json b/.workbuddy/expert-history.json index ff45ebd..7537a91 100644 --- a/.workbuddy/expert-history.json +++ b/.workbuddy/expert-history.json @@ -11,7 +11,29 @@ "usedAt": 1775495439006, "industryId": "all" } + ], + "90ac41da355a447a8c29ed992c8beede": [ + { + "expertId": "SeniorDeveloper", + "name": "Will", + "profession": "高级开发工程师", + "avatarUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/avatars/02-Engineering/SeniorDeveloper/SeniorDeveloper.png", + "promptUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/experts/02-Engineering/SeniorDeveloper/SeniorDeveloper_zh.md", + "usedAt": 1775720823455, + "industryId": "all" + } + ], + "c7ea2a41e7d147e887ab506386658772": [ + { + "expertId": "SeniorDeveloper", + "name": "Will", + "profession": "高级开发工程师", + "avatarUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/avatars/02-Engineering/SeniorDeveloper/SeniorDeveloper.png", + "promptUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/experts/02-Engineering/SeniorDeveloper/SeniorDeveloper_zh.md", + "usedAt": 1775720823455, + "industryId": "all" + } ] }, - "lastUpdated": 1775501968801 + "lastUpdated": 1775722264733 } \ No newline at end of file diff --git a/.workbuddy/memory/2026-04-09.md b/.workbuddy/memory/2026-04-09.md new file mode 100644 index 0000000..c2eb5d9 --- /dev/null +++ b/.workbuddy/memory/2026-04-09.md @@ -0,0 +1,115 @@ +# 2026-04-09 工作记录 + +## UserServiceImpl 添加 MQ 同步功能 + +在 `UserServiceImpl` 中统一添加了用户数据变更时的 MQ 消息发送逻辑: + +### 修改内容 +1. **添加依赖注入**:注入 `SyncMessageProducer`,使用 `@Autowired(required = false)` 避免 MQ 未启用时报错 +2. **saveUser() 方法**:用户创建成功后发送 `CREATE` 事件到 websopy +3. **updateUser() 方法**:用户更新成功后发送 `UPDATE` 事件到 websopy +4. **addUser() 方法**:注释说明已通过 saveUser 发送 MQ,避免重复 + +### 触发场景 +现在以下操作都会触发 MQ 同步: +- 用户注册(短信登录自动注册、普通注册、管理员注册) +- 后台添加用户 +- 用户更新信息 +- 扫码绑定手机号(原有逻辑) + +### 日志输出 +- 创建:`用户创建后发送MQ消息同步到websopy: userId={}, phone={}` +- 更新:`用户更新后发送MQ消息同步到websopy: userId={}, phone={}` + +--- + +## MQ 架构改造 - 方案 A 实现 + +将用户同步架构从 "core 系统消费后 HTTP 转发" 改为 "各子系统直接消费 MQ"。 + +### core 系统改动 (com.gxwebsoft.core) + +#### 1. RabbitMQConfig.java +- **Exchange 类型**:从 `DirectExchange` 改为 `TopicExchange` +- **删除队列定义**:core 系统不再定义消费队列,只负责发送 +- **Routing Key 格式**:`user.sync.{targetSystem}` + +#### 2. RabbitMQSyncProducer.java +- **动态 Routing Key**:根据 `targetSystem` 构建 routing key +- **代码**:`"user.sync." + targetSystem.toLowerCase()` + +#### 3. 删除 UserSyncConsumer.java +- core 系统不再消费用户同步消息 + +#### 4. UserSyncService.java +- **废弃 HTTP 同步逻辑**:所有 HTTP 调用方法已删除 +- **标记 @Deprecated**:保留空实现以兼容旧代码 +- **同步方式**:现在统一通过 MQ 自动触发 + +### websopy 系统改动 (websopy-java) + +#### 1. RabbitMQConfig.java +```java +// websopy 专用队列 +public static final String SYNC_QUEUE_WEBSOPY = "user.sync.websopy.queue"; +public static final String SYNC_ROUTING_KEY_WEBSOPY = "user.sync.websopy"; + +// Topic Exchange(与 core 系统共用) +@Bean +public TopicExchange syncExchange() { + return new TopicExchange(SYNC_EXCHANGE, true, false); +} + +// websopy 专用队列绑定 +@Bean +public Binding syncBindingWebsopy() { + return BindingBuilder.bind(syncQueueWebsopy()) + .to(syncExchange()) + .with(SYNC_ROUTING_KEY_WEBSOPY); +} +``` + +#### 2. SyncMessageConsumer.java +- **监听队列**:改为 `SYNC_QUEUE_WEBSOPY` (`user.sync.websopy.queue`) +- **死信队列**:改为 `DLQ_QUEUE_WEBSOPY` (`user.sync.websopy.dlq`) + +### 新架构流程 +``` +用户操作 → UserServiceImpl.saveUser/updateUser + ↓ + MQ Producer (routing key = user.sync.websopy) + ↓ + Topic Exchange (sync.exchange) + ↓ + ┌───────────┼───────────┐ + ↓ ↓ ↓ +websopy 子系统B 子系统C +(消费者) (消费者) (消费者) +``` + +### 消息格式 +```json +{ + "messageId": "uuid", + "messageType": "USER_SYNC", + "eventType": "CREATE|UPDATE|DELETE", + "targetSystem": "websopy", + "data": { + "userId": 123, + "username": "xxx", + "nickname": "xxx", + "avatar": "xxx", + "phone": "xxx", + "status": 1, + "tenantId": 1 + }, + "createTime": "2026-04-09T16:00:00", + "retryCount": 0 +} +``` + +### 优势 +- **解耦**:core 系统只负责发送,不关心哪些子系统消费 +- **扩展性**:新增子系统只需添加自己的消费者,无需修改 core +- **可靠性**:各子系统独立消费,互不影响 +- **符合 MQ 设计**:消息广播到多个订阅者 diff --git a/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java b/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java index 851c0b7..596b593 100644 --- a/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java +++ b/src/main/java/com/gxwebsoft/common/mq/config/RabbitMQConfig.java @@ -1,7 +1,13 @@ package com.gxwebsoft.common.mq.config; import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -98,9 +104,15 @@ public class RabbitMQConfig { // ==================== 交换机 ==================== + /** + * 用户同步 Topic Exchange + * 使用 Topic 类型,支持按 targetSystem 路由到不同队列 + * routing key 格式: user.sync.{targetSystem} + * 各子系统可以绑定自己的队列来消费消息 + */ @Bean - public DirectExchange syncExchange() { - return new DirectExchange(SYNC_EXCHANGE, true, false); + public TopicExchange syncExchange() { + return new TopicExchange(SYNC_EXCHANGE, true, false); } @Bean @@ -110,13 +122,12 @@ public class RabbitMQConfig { // ==================== 队列 ==================== - @Bean - public Queue syncQueue() { - return QueueBuilder.durable(SYNC_QUEUE) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) - .withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY) - .build(); - } + /** + * 注意:core 系统只负责发送消息,不消费消息 + * 各子系统(websopy等)需要在自己的系统中配置消费者和队列 + * + * 如果 core 系统也需要消费某些消息,可以在这里添加对应的队列 + */ @Bean public Queue deadLetterQueue() { @@ -125,13 +136,6 @@ public class RabbitMQConfig { // ==================== 绑定 ==================== - @Bean - public Binding syncBinding() { - return BindingBuilder.bind(syncQueue()) - .to(syncExchange()) - .with(SYNC_ROUTING_KEY); - } - @Bean public Binding dlqBinding() { return BindingBuilder.bind(deadLetterQueue()) diff --git a/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java b/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java deleted file mode 100644 index 3fe77fe..0000000 --- a/src/main/java/com/gxwebsoft/common/mq/consumer/UserSyncConsumer.java +++ /dev/null @@ -1,159 +0,0 @@ -package com.gxwebsoft.common.mq.consumer; - -import com.gxwebsoft.common.mq.config.RabbitMQConfig; -import com.gxwebsoft.common.mq.message.SyncMessage; -import com.gxwebsoft.common.system.service.UserSyncService; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.Map; - -/** - * 用户同步消息消费者 - * - * 负责监听用户同步消息,并将数据同步到目标系统(如websopy) - */ -@Slf4j -@Component -@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true) -public class UserSyncConsumer { - - @Autowired(required = false) - private UserSyncService userSyncService; - - private static final int MAX_RETRY_COUNT = 3; - - /** - * 监听用户同步消息 - */ - @RabbitListener(queues = RabbitMQConfig.SYNC_QUEUE) - public void handleMessage(SyncMessage message, Channel channel, Message amqpMessage) { - long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); - - try { - log.info("收到MQ消息: messageId={}, type={}, event={}, target={}", - message.getMessageId(), message.getMessageType(), - message.getEventType(), message.getTargetSystem()); - - // 检查是否启用了同步服务 - if (userSyncService == null) { - log.warn("UserSyncService 未启用,跳过消息处理: messageId={}", message.getMessageId()); - channel.basicAck(deliveryTag, false); - return; - } - - // 处理消息 - processMessage(message); - - // 确认消息 - channel.basicAck(deliveryTag, false); - log.info("消息处理成功: messageId={}", message.getMessageId()); - - } catch (Exception e) { - log.error("消息处理失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e); - handleFailure(channel, amqpMessage, deliveryTag, message, e); - } - } - - /** - * 处理消息 - */ - private void processMessage(SyncMessage message) { - String messageType = message.getMessageType(); - String eventType = message.getEventType(); - String targetSystem = message.getTargetSystem(); - Map data = message.getData(); - - // 判断消息类型并处理 - if ("USER_SYNC".equals(messageType)) { - handleUserSync(targetSystem, eventType, data); - } else { - log.warn("未知的消息类型: messageType={}", messageType); - } - } - - /** - * 处理用户同步 - */ - private void handleUserSync(String targetSystem, String eventType, Map data) { - switch (eventType) { - case "CREATE": - case "UPDATE": - userSyncService.syncUserToWebsopy(data); - log.info("用户同步到{}成功: event={}", targetSystem, eventType); - break; - case "DELETE": - userSyncService.deleteUserFromWebsopy(data); - log.info("用户从{}删除成功: event={}", targetSystem, eventType); - break; - default: - log.warn("未知的用户事件类型: eventType={}", eventType); - } - } - - /** - * 处理失败消息 - */ - private void handleFailure(Channel channel, Message amqpMessage, long deliveryTag, - SyncMessage message, Exception e) { - Integer retryCount = message.getRetryCount(); - if (retryCount == null) { - retryCount = 0; - } - - if (retryCount < MAX_RETRY_COUNT) { - // 重试:拒绝消息并重新入队 - try { - log.warn("消息处理失败,准备重试: messageId={}, retryCount={}/{}", - message.getMessageId(), retryCount + 1, MAX_RETRY_COUNT); - // 增加重试次数 - message.setRetryCount(retryCount + 1); - // 拒绝消息(requeue=true 会重新入队) - channel.basicNack(deliveryTag, false, true); - } catch (IOException ioException) { - log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException); - } - } else { - // 超过重试次数,发送到死信队列 - try { - log.error("消息处理失败次数超限,发送到死信队列: messageId={}, retryCount={}", - message.getMessageId(), retryCount); - // 拒绝消息(requeue=false 进入死信队列) - channel.basicNack(deliveryTag, false, false); - } catch (IOException ioException) { - log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException); - } - } - } - - /** - * 监听死信队列消息(可选,用于告警或人工处理) - */ - @RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE) - public void handleDeadLetter(SyncMessage message, Channel channel, Message amqpMessage) { - long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); - try { - log.error("死信消息: messageId={}, type={}, event={}, target={}, retryCount={}", - message.getMessageId(), message.getMessageType(), - message.getEventType(), message.getTargetSystem(), - message.getRetryCount()); - - // TODO: 可以在这里添加告警逻辑,如发送邮件、钉钉通知等 - - channel.basicAck(deliveryTag, false); - } catch (Exception e) { - log.error("处理死信消息失败: messageId={}", message.getMessageId(), e); - try { - channel.basicAck(deliveryTag, false); - } catch (IOException ioException) { - log.error("确认死信消息失败", ioException); - } - } - } -} diff --git a/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java b/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java index 6af747b..5aff9ad 100644 --- a/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java +++ b/src/main/java/com/gxwebsoft/common/mq/producer/impl/RabbitMQSyncProducer.java @@ -66,9 +66,14 @@ public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate ); } + // 使用 targetSystem 作为 routing key + // 格式: user.sync.{targetSystem} + // 各子系统绑定队列时使用 pattern: user.sync.{systemName} + String routingKey = buildRoutingKey(message.getTargetSystem()); + rabbitTemplate.convertAndSend( RabbitMQConfig.SYNC_EXCHANGE, - RabbitMQConfig.SYNC_ROUTING_KEY, + routingKey, message, correlationData ); @@ -81,6 +86,17 @@ public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate } } + /** + * 构建 routing key + * 格式: user.sync.{targetSystem} + */ + private String buildRoutingKey(String targetSystem) { + if (targetSystem == null || targetSystem.isEmpty()) { + return "user.sync.all"; + } + return "user.sync." + targetSystem.toLowerCase(); + } + @Override public void sendUserSyncMessage(String targetSystem, String eventType, Object userData) { try { diff --git a/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java b/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java index 984bfa8..59f923f 100644 --- a/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java +++ b/src/main/java/com/gxwebsoft/common/system/service/UserSyncService.java @@ -1,302 +1,42 @@ package com.gxwebsoft.common.system.service; -import com.gxwebsoft.common.core.config.ConfigProperties; -import com.gxwebsoft.common.core.utils.HttpUtils; -import com.gxwebsoft.common.system.entity.User; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpResponse; -import org.apache.http.util.EntityUtils; import org.springframework.stereotype.Service; -import javax.annotation.Resource; -import java.nio.charset.StandardCharsets; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; - /** - * 用户同步服务(同步到 websopy) + * 用户同步服务 + * + * 注意:此服务已废弃,用户同步现在通过 MQ 实现。 + * 各子系统(websopy等)需要在自己的系统中消费 MQ 消息进行同步。 + * + * 保留此类是为了兼容可能存在的旧代码引用,所有方法已改为空实现。 * * @author WebSoft * @since 2026-04-04 + * @deprecated 请使用 MQ 消息进行用户同步 */ @Slf4j @Service +@Deprecated public class UserSyncService { - - /** - * 转义JSON字符串中的特殊字符 - */ - private String escapeJson(String str) { - if (str == null) { - return ""; - } - return str.replace("\\", "\\\\") - .replace("\"", "\\\"") - .replace("\b", "\\b") - .replace("\f", "\\f") - .replace("\n", "\\n") - .replace("\r", "\\r") - .replace("\t", "\\t"); - } - - @Resource - private ConfigProperties configProperties; - - /** - * 获取 websopy 基础 URL - */ - private String getWebsopyBaseUrl() { - return configProperties.getWebsopyUrl(); - } /** * 同步单个用户到 websopy - * - * @param user 用户信息 + * + * @deprecated 已废弃,用户同步现在通过 MQ 自动触发 */ - public void syncUserToWebsopy(User user) { - String websopyBaseUrl = getWebsopyBaseUrl(); - if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) { - log.warn("websopyUrl 未配置,跳过用户同步: userId={}", user.getUserId()); - return; - } - - if (user == null || user.getUserId() == null) { - log.warn("用户信息为空,跳过同步"); - return; - } - - try { - // 构建请求体 - 使用手动JSON构建确保字段正确 - Integer tenantIdValue = user.getTenantId() != null ? user.getTenantId() : 0; - - // 构建JSON字符串,确保tenant_id字段存在且不为null - StringBuilder jsonBuilder = new StringBuilder(); - jsonBuilder.append("{"); - jsonBuilder.append("\"userId\":").append(user.getUserId()).append(","); - jsonBuilder.append("\"username\":\"").append(escapeJson(user.getUsername())).append("\","); - jsonBuilder.append("\"nickname\":\"").append(escapeJson(user.getNickname())).append("\","); - jsonBuilder.append("\"avatar\":\"").append(escapeJson(user.getAvatar())).append("\","); - jsonBuilder.append("\"phone\":\"").append(escapeJson(user.getPhone())).append("\","); - jsonBuilder.append("\"status\":").append(user.getStatus()).append(","); - jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\","); - jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(","); // 驼峰格式 - jsonBuilder.append("\"tenant_id\":").append(tenantIdValue); // 下划线格式 - jsonBuilder.append("}"); - - String url = websopyBaseUrl + "/api/app/user-sync/single"; - String body = jsonBuilder.toString(); - - log.info("同步用户到 websopy: userId={}, username={}, nickname={}, phone={}, tenantId={}, url={}", - user.getUserId(), user.getUsername(), user.getNickname(), user.getPhone(), user.getTenantId(), url); - log.info("同步用户请求体JSON: {}", body); // 改为info级别以便查看 - // 额外日志:tenantId 值检查 - log.debug("tenantId检查 - 原始值: {}, 转换后值: {}", - user.getTenantId(), tenantIdValue); - - // 发送 HTTP POST 请求 - Map headers = new HashMap<>(); - headers.put("Content-Type", "application/json"); - - HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body); - String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - - log.debug("websopy 响应: {}", responseBody); - - // 解析响应 - JSONObject result = JSON.parseObject(responseBody); - if (result != null && result.getIntValue("code") == 0) { - log.info("用户同步成功: userId={}", user.getUserId()); - } else { - String message = result != null ? result.getString("message") : "未知错误"; - log.error("用户同步失败: userId={}, message={}", user.getUserId(), message); - } - } catch (Exception e) { - log.error("用户同步异常: userId={}, error={}", user.getUserId(), e.getMessage(), e); - } - } - - /** - * 同步用户到 websopy(Map格式,用于MQ消息处理) - * - * @param data 用户信息Map - */ - public void syncUserToWebsopy(Map data) { - String websopyBaseUrl = getWebsopyBaseUrl(); - if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) { - log.warn("websopyUrl 未配置,跳过用户同步"); - return; - } - - if (data == null || data.isEmpty()) { - log.warn("用户数据为空,跳过同步"); - return; - } - - try { - Integer userId = getIntValue(data, "userId"); - if (userId == null) { - log.warn("用户数据中缺少userId,跳过同步"); - return; - } - - Integer tenantIdValue = getIntValue(data, "tenantId"); - if (tenantIdValue == null) { - tenantIdValue = 0; - } - - // 构建JSON字符串 - StringBuilder jsonBuilder = new StringBuilder(); - jsonBuilder.append("{"); - jsonBuilder.append("\"userId\":").append(userId).append(","); - jsonBuilder.append("\"username\":\"").append(escapeJson(getStringValue(data, "username"))).append("\","); - jsonBuilder.append("\"nickname\":\"").append(escapeJson(getStringValue(data, "nickname"))).append("\","); - jsonBuilder.append("\"avatar\":\"").append(escapeJson(getStringValue(data, "avatar"))).append("\","); - jsonBuilder.append("\"phone\":\"").append(escapeJson(getStringValue(data, "phone"))).append("\","); - jsonBuilder.append("\"status\":").append(getIntValue(data, "status", 1)).append(","); - jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\","); - jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(","); - jsonBuilder.append("\"tenant_id\":").append(tenantIdValue); - jsonBuilder.append("}"); - - String url = websopyBaseUrl + "/api/app/user-sync/single"; - String body = jsonBuilder.toString(); - - log.info("MQ同步用户到 websopy: userId={}, body={}", userId, body); - - Map headers = new HashMap<>(); - headers.put("Content-Type", "application/json"); - - HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body); - String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - - JSONObject result = JSON.parseObject(responseBody); - if (result != null && result.getIntValue("code") == 0) { - log.info("MQ用户同步成功: userId={}", userId); - } else { - String message = result != null ? result.getString("message") : "未知错误"; - log.error("MQ用户同步失败: userId={}, message={}", userId, message); - } - } catch (Exception e) { - log.error("MQ用户同步异常: error={}", e.getMessage(), e); - } - } - - /** - * 从 websopy 删除用户 - * - * @param data 包含userId的数据 - */ - public void deleteUserFromWebsopy(Map data) { - String websopyBaseUrl = getWebsopyBaseUrl(); - if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) { - log.warn("websopyUrl 未配置,跳过删除用户"); - return; - } - - if (data == null) { - log.warn("删除用户数据为空"); - return; - } - - try { - Integer userId = getIntValue(data, "userId"); - if (userId == null) { - log.warn("删除用户数据中缺少userId"); - return; - } - - String url = websopyBaseUrl + "/api/app/user-sync/delete/" + userId; - - log.info("从 websopy 删除用户: userId={}", userId); - - Map headers = new HashMap<>(); - headers.put("Content-Type", "application/json"); - - HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, ""); - String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - - JSONObject result = JSON.parseObject(responseBody); - if (result != null && result.getIntValue("code") == 0) { - log.info("MQ删除用户成功: userId={}", userId); - } else { - String message = result != null ? result.getString("message") : "未知错误"; - log.error("MQ删除用户失败: userId={}, message={}", userId, message); - } - } catch (Exception e) { - log.error("MQ删除用户异常: error={}", e.getMessage(), e); - } - } - - private String getStringValue(Map data, String key) { - Object value = data.get(key); - return value != null ? String.valueOf(value) : ""; - } - - private Integer getIntValue(Map data, String key) { - return getIntValue(data, key, null); - } - - private Integer getIntValue(Map data, String key, Integer defaultValue) { - Object value = data.get(key); - if (value == null) { - return defaultValue; - } - if (value instanceof Integer) { - return (Integer) value; - } - if (value instanceof Number) { - return ((Number) value).intValue(); - } - try { - return Integer.parseInt(String.valueOf(value)); - } catch (NumberFormatException e) { - return defaultValue; - } + @Deprecated + public void syncUserToWebsopy(Object user) { + log.debug("UserSyncService.syncUserToWebsopy 已废弃,用户同步通过 MQ 自动触发"); } /** * 刷新 websopy 端的用户缓存 - * 只传 userId,websopy 端会通过 API 回查获取完整信息 - * - * @param userId 用户ID + * + * @deprecated 已废弃 */ + @Deprecated public void refreshUserCache(Integer userId) { - String websopyBaseUrl = getWebsopyBaseUrl(); - if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) { - log.warn("websopyUrl 未配置,跳过刷新缓存: userId={}", userId); - return; - } - - if (userId == null) { - log.warn("userId 为空,跳过刷新"); - return; - } - - try { - String url = websopyBaseUrl + "/api/app/user-sync/refresh/" + userId; - - log.info("刷新用户缓存: userId={}, url={}", userId, url); - - Map headers = new HashMap<>(); - headers.put("Content-Type", "application/json"); - - HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, ""); - String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - - JSONObject result = JSON.parseObject(responseBody); - if (result != null && result.getIntValue("code") == 0) { - log.info("刷新缓存成功: userId={}", userId); - } else { - String message = result != null ? result.getString("message") : "未知错误"; - log.error("刷新缓存失败: userId={}, message={}", userId, message); - } - } catch (Exception e) { - log.error("刷新缓存异常: userId={}, error={}", userId, e.getMessage(), e); - } + log.debug("UserSyncService.refreshUserCache 已废弃"); } } diff --git a/src/main/java/com/gxwebsoft/common/system/service/impl/UserServiceImpl.java b/src/main/java/com/gxwebsoft/common/system/service/impl/UserServiceImpl.java index 097dc77..fa98528 100644 --- a/src/main/java/com/gxwebsoft/common/system/service/impl/UserServiceImpl.java +++ b/src/main/java/com/gxwebsoft/common/system/service/impl/UserServiceImpl.java @@ -14,7 +14,10 @@ import com.gxwebsoft.common.system.entity.*; import com.gxwebsoft.common.system.mapper.UserMapper; import com.gxwebsoft.common.system.param.LoginParam; import com.gxwebsoft.common.system.param.UserParam; +import com.gxwebsoft.common.mq.producer.SyncMessageProducer; import com.gxwebsoft.common.system.service.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.security.core.userdetails.UsernameNotFoundException; import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; @@ -34,6 +37,7 @@ import static com.gxwebsoft.common.core.constants.PlatformConstants.WEB; * @author WebSoft * @since 2018-12-24 16:10:14 */ +@Slf4j @Service public class UserServiceImpl extends ServiceImpl implements UserService { @Resource @@ -51,6 +55,9 @@ public class UserServiceImpl extends ServiceImpl implements Us @Resource private UserRefereeService userRefereeService; + @Autowired(required = false) + private SyncMessageProducer syncMessageProducer; + @Override public PageResult pageRel(UserParam param) { PageParam page = new PageParam<>(param); @@ -130,6 +137,14 @@ public class UserServiceImpl extends ServiceImpl implements Us throw new BusinessException("用户角色添加失败"); } } + // 用户创建成功后,通过MQ异步同步用户数据到 websopy + if (result && syncMessageProducer != null) { + User savedUser = getAllByUserId(String.valueOf(user.getUserId())); + if (savedUser != null) { + syncMessageProducer.sendUserSyncMessage("websopy", "CREATE", savedUser); + log.info("用户创建后发送MQ消息同步到websopy: userId={}, phone={}", user.getUserId(), user.getPhone()); + } + } return result; } @@ -163,6 +178,14 @@ public class UserServiceImpl extends ServiceImpl implements Us throw new BusinessException("用户角色添加失败"); } } + // 用户更新成功后,通过MQ异步同步用户数据到 websopy + if (result && syncMessageProducer != null) { + User updatedUser = getAllByUserId(String.valueOf(user.getUserId())); + if (updatedUser != null) { + syncMessageProducer.sendUserSyncMessage("websopy", "UPDATE", updatedUser); + log.info("用户更新后发送MQ消息同步到websopy: userId={}, phone={}", user.getUserId(), user.getPhone()); + } + } return result; } @@ -323,6 +346,7 @@ public class UserServiceImpl extends ServiceImpl implements Us // Ensure caller (e.g. register / invite register) gets non-empty roles/authorities in response. addUser.setRoles(userRoleService.listByUserId(addUser.getUserId())); addUser.setAuthorities(roleMenuService.listMenuByUserId(addUser.getUserId(), null)); + // addUser内部调用saveUser,saveUser已发送MQ消息,这里不需要重复发送 return addUser; }