feat(mq): 重构用户同步为MQ消息驱动并移除HTTP同步服务
- 将用户同步消息交换机由 DirectExchange 改为 TopicExchange,实现多目标系统路由 - 去除核心系统中用户同步队列定义和消费者,转由各子系统独立消费 - RabbitMQSyncProducer 动态构建 routing key,支持按目标系统路由同步消息 - UserServiceImpl 用户新增和更新后发送同步消息到 websopy 的 MQ 交换机 - 废弃 UserSyncService 中的 HTTP 同步接口,改为通过 MQ 实现用户数据同步 - 删除 UserSyncConsumer,核心系统不再直接消费用户同步消息 - 增加日志输出,便于追踪用户同步消息发送情况 - 保留废弃代码兼容旧引用,方便后续平滑迁移和维护
This commit is contained in:
@@ -11,7 +11,29 @@
|
|||||||
"usedAt": 1775495439006,
|
"usedAt": 1775495439006,
|
||||||
"industryId": "all"
|
"industryId": "all"
|
||||||
}
|
}
|
||||||
|
],
|
||||||
|
"90ac41da355a447a8c29ed992c8beede": [
|
||||||
|
{
|
||||||
|
"expertId": "SeniorDeveloper",
|
||||||
|
"name": "Will",
|
||||||
|
"profession": "高级开发工程师",
|
||||||
|
"avatarUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/avatars/02-Engineering/SeniorDeveloper/SeniorDeveloper.png",
|
||||||
|
"promptUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/experts/02-Engineering/SeniorDeveloper/SeniorDeveloper_zh.md",
|
||||||
|
"usedAt": 1775720823455,
|
||||||
|
"industryId": "all"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"c7ea2a41e7d147e887ab506386658772": [
|
||||||
|
{
|
||||||
|
"expertId": "SeniorDeveloper",
|
||||||
|
"name": "Will",
|
||||||
|
"profession": "高级开发工程师",
|
||||||
|
"avatarUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/avatars/02-Engineering/SeniorDeveloper/SeniorDeveloper.png",
|
||||||
|
"promptUrl": "https://acc-1258344699.cos.accelerate.myqcloud.com/workbuddy/experts/experts/02-Engineering/SeniorDeveloper/SeniorDeveloper_zh.md",
|
||||||
|
"usedAt": 1775720823455,
|
||||||
|
"industryId": "all"
|
||||||
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"lastUpdated": 1775501968801
|
"lastUpdated": 1775722264733
|
||||||
}
|
}
|
||||||
115
.workbuddy/memory/2026-04-09.md
Normal file
115
.workbuddy/memory/2026-04-09.md
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
# 2026-04-09 工作记录
|
||||||
|
|
||||||
|
## UserServiceImpl 添加 MQ 同步功能
|
||||||
|
|
||||||
|
在 `UserServiceImpl` 中统一添加了用户数据变更时的 MQ 消息发送逻辑:
|
||||||
|
|
||||||
|
### 修改内容
|
||||||
|
1. **添加依赖注入**:注入 `SyncMessageProducer`,使用 `@Autowired(required = false)` 避免 MQ 未启用时报错
|
||||||
|
2. **saveUser() 方法**:用户创建成功后发送 `CREATE` 事件到 websopy
|
||||||
|
3. **updateUser() 方法**:用户更新成功后发送 `UPDATE` 事件到 websopy
|
||||||
|
4. **addUser() 方法**:注释说明已通过 saveUser 发送 MQ,避免重复
|
||||||
|
|
||||||
|
### 触发场景
|
||||||
|
现在以下操作都会触发 MQ 同步:
|
||||||
|
- 用户注册(短信登录自动注册、普通注册、管理员注册)
|
||||||
|
- 后台添加用户
|
||||||
|
- 用户更新信息
|
||||||
|
- 扫码绑定手机号(原有逻辑)
|
||||||
|
|
||||||
|
### 日志输出
|
||||||
|
- 创建:`用户创建后发送MQ消息同步到websopy: userId={}, phone={}`
|
||||||
|
- 更新:`用户更新后发送MQ消息同步到websopy: userId={}, phone={}`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## MQ 架构改造 - 方案 A 实现
|
||||||
|
|
||||||
|
将用户同步架构从 "core 系统消费后 HTTP 转发" 改为 "各子系统直接消费 MQ"。
|
||||||
|
|
||||||
|
### core 系统改动 (com.gxwebsoft.core)
|
||||||
|
|
||||||
|
#### 1. RabbitMQConfig.java
|
||||||
|
- **Exchange 类型**:从 `DirectExchange` 改为 `TopicExchange`
|
||||||
|
- **删除队列定义**:core 系统不再定义消费队列,只负责发送
|
||||||
|
- **Routing Key 格式**:`user.sync.{targetSystem}`
|
||||||
|
|
||||||
|
#### 2. RabbitMQSyncProducer.java
|
||||||
|
- **动态 Routing Key**:根据 `targetSystem` 构建 routing key
|
||||||
|
- **代码**:`"user.sync." + targetSystem.toLowerCase()`
|
||||||
|
|
||||||
|
#### 3. 删除 UserSyncConsumer.java
|
||||||
|
- core 系统不再消费用户同步消息
|
||||||
|
|
||||||
|
#### 4. UserSyncService.java
|
||||||
|
- **废弃 HTTP 同步逻辑**:所有 HTTP 调用方法已删除
|
||||||
|
- **标记 @Deprecated**:保留空实现以兼容旧代码
|
||||||
|
- **同步方式**:现在统一通过 MQ 自动触发
|
||||||
|
|
||||||
|
### websopy 系统改动 (websopy-java)
|
||||||
|
|
||||||
|
#### 1. RabbitMQConfig.java
|
||||||
|
```java
|
||||||
|
// websopy 专用队列
|
||||||
|
public static final String SYNC_QUEUE_WEBSOPY = "user.sync.websopy.queue";
|
||||||
|
public static final String SYNC_ROUTING_KEY_WEBSOPY = "user.sync.websopy";
|
||||||
|
|
||||||
|
// Topic Exchange(与 core 系统共用)
|
||||||
|
@Bean
|
||||||
|
public TopicExchange syncExchange() {
|
||||||
|
return new TopicExchange(SYNC_EXCHANGE, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// websopy 专用队列绑定
|
||||||
|
@Bean
|
||||||
|
public Binding syncBindingWebsopy() {
|
||||||
|
return BindingBuilder.bind(syncQueueWebsopy())
|
||||||
|
.to(syncExchange())
|
||||||
|
.with(SYNC_ROUTING_KEY_WEBSOPY);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### 2. SyncMessageConsumer.java
|
||||||
|
- **监听队列**:改为 `SYNC_QUEUE_WEBSOPY` (`user.sync.websopy.queue`)
|
||||||
|
- **死信队列**:改为 `DLQ_QUEUE_WEBSOPY` (`user.sync.websopy.dlq`)
|
||||||
|
|
||||||
|
### 新架构流程
|
||||||
|
```
|
||||||
|
用户操作 → UserServiceImpl.saveUser/updateUser
|
||||||
|
↓
|
||||||
|
MQ Producer (routing key = user.sync.websopy)
|
||||||
|
↓
|
||||||
|
Topic Exchange (sync.exchange)
|
||||||
|
↓
|
||||||
|
┌───────────┼───────────┐
|
||||||
|
↓ ↓ ↓
|
||||||
|
websopy 子系统B 子系统C
|
||||||
|
(消费者) (消费者) (消费者)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 消息格式
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"messageId": "uuid",
|
||||||
|
"messageType": "USER_SYNC",
|
||||||
|
"eventType": "CREATE|UPDATE|DELETE",
|
||||||
|
"targetSystem": "websopy",
|
||||||
|
"data": {
|
||||||
|
"userId": 123,
|
||||||
|
"username": "xxx",
|
||||||
|
"nickname": "xxx",
|
||||||
|
"avatar": "xxx",
|
||||||
|
"phone": "xxx",
|
||||||
|
"status": 1,
|
||||||
|
"tenantId": 1
|
||||||
|
},
|
||||||
|
"createTime": "2026-04-09T16:00:00",
|
||||||
|
"retryCount": 0
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 优势
|
||||||
|
- **解耦**:core 系统只负责发送,不关心哪些子系统消费
|
||||||
|
- **扩展性**:新增子系统只需添加自己的消费者,无需修改 core
|
||||||
|
- **可靠性**:各子系统独立消费,互不影响
|
||||||
|
- **符合 MQ 设计**:消息广播到多个订阅者
|
||||||
@@ -1,7 +1,13 @@
|
|||||||
package com.gxwebsoft.common.mq.config;
|
package com.gxwebsoft.common.mq.config;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.core.DirectExchange;
|
||||||
|
import org.springframework.amqp.core.TopicExchange;
|
||||||
|
import org.springframework.amqp.core.Queue;
|
||||||
|
import org.springframework.amqp.core.QueueBuilder;
|
||||||
|
import org.springframework.amqp.core.Binding;
|
||||||
|
import org.springframework.amqp.core.BindingBuilder;
|
||||||
|
import org.springframework.amqp.core.AcknowledgeMode;
|
||||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
@@ -98,9 +104,15 @@ public class RabbitMQConfig {
|
|||||||
|
|
||||||
// ==================== 交换机 ====================
|
// ==================== 交换机 ====================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 用户同步 Topic Exchange
|
||||||
|
* 使用 Topic 类型,支持按 targetSystem 路由到不同队列
|
||||||
|
* routing key 格式: user.sync.{targetSystem}
|
||||||
|
* 各子系统可以绑定自己的队列来消费消息
|
||||||
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public DirectExchange syncExchange() {
|
public TopicExchange syncExchange() {
|
||||||
return new DirectExchange(SYNC_EXCHANGE, true, false);
|
return new TopicExchange(SYNC_EXCHANGE, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@@ -110,13 +122,12 @@ public class RabbitMQConfig {
|
|||||||
|
|
||||||
// ==================== 队列 ====================
|
// ==================== 队列 ====================
|
||||||
|
|
||||||
@Bean
|
/**
|
||||||
public Queue syncQueue() {
|
* 注意:core 系统只负责发送消息,不消费消息
|
||||||
return QueueBuilder.durable(SYNC_QUEUE)
|
* 各子系统(websopy等)需要在自己的系统中配置消费者和队列
|
||||||
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE)
|
*
|
||||||
.withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY)
|
* 如果 core 系统也需要消费某些消息,可以在这里添加对应的队列
|
||||||
.build();
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Queue deadLetterQueue() {
|
public Queue deadLetterQueue() {
|
||||||
@@ -125,13 +136,6 @@ public class RabbitMQConfig {
|
|||||||
|
|
||||||
// ==================== 绑定 ====================
|
// ==================== 绑定 ====================
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Binding syncBinding() {
|
|
||||||
return BindingBuilder.bind(syncQueue())
|
|
||||||
.to(syncExchange())
|
|
||||||
.with(SYNC_ROUTING_KEY);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Binding dlqBinding() {
|
public Binding dlqBinding() {
|
||||||
return BindingBuilder.bind(deadLetterQueue())
|
return BindingBuilder.bind(deadLetterQueue())
|
||||||
|
|||||||
@@ -1,159 +0,0 @@
|
|||||||
package com.gxwebsoft.common.mq.consumer;
|
|
||||||
|
|
||||||
import com.gxwebsoft.common.mq.config.RabbitMQConfig;
|
|
||||||
import com.gxwebsoft.common.mq.message.SyncMessage;
|
|
||||||
import com.gxwebsoft.common.system.service.UserSyncService;
|
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.amqp.core.Message;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 用户同步消息消费者
|
|
||||||
*
|
|
||||||
* 负责监听用户同步消息,并将数据同步到目标系统(如websopy)
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
@ConditionalOnProperty(name = "sync.mq.enabled", havingValue = "true", matchIfMissing = true)
|
|
||||||
public class UserSyncConsumer {
|
|
||||||
|
|
||||||
@Autowired(required = false)
|
|
||||||
private UserSyncService userSyncService;
|
|
||||||
|
|
||||||
private static final int MAX_RETRY_COUNT = 3;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 监听用户同步消息
|
|
||||||
*/
|
|
||||||
@RabbitListener(queues = RabbitMQConfig.SYNC_QUEUE)
|
|
||||||
public void handleMessage(SyncMessage message, Channel channel, Message amqpMessage) {
|
|
||||||
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
|
||||||
|
|
||||||
try {
|
|
||||||
log.info("收到MQ消息: messageId={}, type={}, event={}, target={}",
|
|
||||||
message.getMessageId(), message.getMessageType(),
|
|
||||||
message.getEventType(), message.getTargetSystem());
|
|
||||||
|
|
||||||
// 检查是否启用了同步服务
|
|
||||||
if (userSyncService == null) {
|
|
||||||
log.warn("UserSyncService 未启用,跳过消息处理: messageId={}", message.getMessageId());
|
|
||||||
channel.basicAck(deliveryTag, false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理消息
|
|
||||||
processMessage(message);
|
|
||||||
|
|
||||||
// 确认消息
|
|
||||||
channel.basicAck(deliveryTag, false);
|
|
||||||
log.info("消息处理成功: messageId={}", message.getMessageId());
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("消息处理失败: messageId={}, error={}", message.getMessageId(), e.getMessage(), e);
|
|
||||||
handleFailure(channel, amqpMessage, deliveryTag, message, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理消息
|
|
||||||
*/
|
|
||||||
private void processMessage(SyncMessage message) {
|
|
||||||
String messageType = message.getMessageType();
|
|
||||||
String eventType = message.getEventType();
|
|
||||||
String targetSystem = message.getTargetSystem();
|
|
||||||
Map<String, Object> data = message.getData();
|
|
||||||
|
|
||||||
// 判断消息类型并处理
|
|
||||||
if ("USER_SYNC".equals(messageType)) {
|
|
||||||
handleUserSync(targetSystem, eventType, data);
|
|
||||||
} else {
|
|
||||||
log.warn("未知的消息类型: messageType={}", messageType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理用户同步
|
|
||||||
*/
|
|
||||||
private void handleUserSync(String targetSystem, String eventType, Map<String, Object> data) {
|
|
||||||
switch (eventType) {
|
|
||||||
case "CREATE":
|
|
||||||
case "UPDATE":
|
|
||||||
userSyncService.syncUserToWebsopy(data);
|
|
||||||
log.info("用户同步到{}成功: event={}", targetSystem, eventType);
|
|
||||||
break;
|
|
||||||
case "DELETE":
|
|
||||||
userSyncService.deleteUserFromWebsopy(data);
|
|
||||||
log.info("用户从{}删除成功: event={}", targetSystem, eventType);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
log.warn("未知的用户事件类型: eventType={}", eventType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 处理失败消息
|
|
||||||
*/
|
|
||||||
private void handleFailure(Channel channel, Message amqpMessage, long deliveryTag,
|
|
||||||
SyncMessage message, Exception e) {
|
|
||||||
Integer retryCount = message.getRetryCount();
|
|
||||||
if (retryCount == null) {
|
|
||||||
retryCount = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (retryCount < MAX_RETRY_COUNT) {
|
|
||||||
// 重试:拒绝消息并重新入队
|
|
||||||
try {
|
|
||||||
log.warn("消息处理失败,准备重试: messageId={}, retryCount={}/{}",
|
|
||||||
message.getMessageId(), retryCount + 1, MAX_RETRY_COUNT);
|
|
||||||
// 增加重试次数
|
|
||||||
message.setRetryCount(retryCount + 1);
|
|
||||||
// 拒绝消息(requeue=true 会重新入队)
|
|
||||||
channel.basicNack(deliveryTag, false, true);
|
|
||||||
} catch (IOException ioException) {
|
|
||||||
log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// 超过重试次数,发送到死信队列
|
|
||||||
try {
|
|
||||||
log.error("消息处理失败次数超限,发送到死信队列: messageId={}, retryCount={}",
|
|
||||||
message.getMessageId(), retryCount);
|
|
||||||
// 拒绝消息(requeue=false 进入死信队列)
|
|
||||||
channel.basicNack(deliveryTag, false, false);
|
|
||||||
} catch (IOException ioException) {
|
|
||||||
log.error("消息拒绝失败: messageId={}", message.getMessageId(), ioException);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 监听死信队列消息(可选,用于告警或人工处理)
|
|
||||||
*/
|
|
||||||
@RabbitListener(queues = RabbitMQConfig.DLQ_QUEUE)
|
|
||||||
public void handleDeadLetter(SyncMessage message, Channel channel, Message amqpMessage) {
|
|
||||||
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
|
||||||
try {
|
|
||||||
log.error("死信消息: messageId={}, type={}, event={}, target={}, retryCount={}",
|
|
||||||
message.getMessageId(), message.getMessageType(),
|
|
||||||
message.getEventType(), message.getTargetSystem(),
|
|
||||||
message.getRetryCount());
|
|
||||||
|
|
||||||
// TODO: 可以在这里添加告警逻辑,如发送邮件、钉钉通知等
|
|
||||||
|
|
||||||
channel.basicAck(deliveryTag, false);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("处理死信消息失败: messageId={}", message.getMessageId(), e);
|
|
||||||
try {
|
|
||||||
channel.basicAck(deliveryTag, false);
|
|
||||||
} catch (IOException ioException) {
|
|
||||||
log.error("确认死信消息失败", ioException);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -66,9 +66,14 @@ public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 使用 targetSystem 作为 routing key
|
||||||
|
// 格式: user.sync.{targetSystem}
|
||||||
|
// 各子系统绑定队列时使用 pattern: user.sync.{systemName}
|
||||||
|
String routingKey = buildRoutingKey(message.getTargetSystem());
|
||||||
|
|
||||||
rabbitTemplate.convertAndSend(
|
rabbitTemplate.convertAndSend(
|
||||||
RabbitMQConfig.SYNC_EXCHANGE,
|
RabbitMQConfig.SYNC_EXCHANGE,
|
||||||
RabbitMQConfig.SYNC_ROUTING_KEY,
|
routingKey,
|
||||||
message,
|
message,
|
||||||
correlationData
|
correlationData
|
||||||
);
|
);
|
||||||
@@ -81,6 +86,17 @@ public class RabbitMQSyncProducer implements SyncMessageProducer, RabbitTemplate
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 构建 routing key
|
||||||
|
* 格式: user.sync.{targetSystem}
|
||||||
|
*/
|
||||||
|
private String buildRoutingKey(String targetSystem) {
|
||||||
|
if (targetSystem == null || targetSystem.isEmpty()) {
|
||||||
|
return "user.sync.all";
|
||||||
|
}
|
||||||
|
return "user.sync." + targetSystem.toLowerCase();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendUserSyncMessage(String targetSystem, String eventType, Object userData) {
|
public void sendUserSyncMessage(String targetSystem, String eventType, Object userData) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,302 +1,42 @@
|
|||||||
package com.gxwebsoft.common.system.service;
|
package com.gxwebsoft.common.system.service;
|
||||||
|
|
||||||
import com.gxwebsoft.common.core.config.ConfigProperties;
|
|
||||||
import com.gxwebsoft.common.core.utils.HttpUtils;
|
|
||||||
import com.gxwebsoft.common.system.entity.User;
|
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.http.HttpResponse;
|
|
||||||
import org.apache.http.util.EntityUtils;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户同步服务(同步到 websopy)
|
* 用户同步服务
|
||||||
|
*
|
||||||
|
* 注意:此服务已废弃,用户同步现在通过 MQ 实现。
|
||||||
|
* 各子系统(websopy等)需要在自己的系统中消费 MQ 消息进行同步。
|
||||||
|
*
|
||||||
|
* 保留此类是为了兼容可能存在的旧代码引用,所有方法已改为空实现。
|
||||||
*
|
*
|
||||||
* @author WebSoft
|
* @author WebSoft
|
||||||
* @since 2026-04-04
|
* @since 2026-04-04
|
||||||
|
* @deprecated 请使用 MQ 消息进行用户同步
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
|
@Deprecated
|
||||||
public class UserSyncService {
|
public class UserSyncService {
|
||||||
|
|
||||||
/**
|
|
||||||
* 转义JSON字符串中的特殊字符
|
|
||||||
*/
|
|
||||||
private String escapeJson(String str) {
|
|
||||||
if (str == null) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
return str.replace("\\", "\\\\")
|
|
||||||
.replace("\"", "\\\"")
|
|
||||||
.replace("\b", "\\b")
|
|
||||||
.replace("\f", "\\f")
|
|
||||||
.replace("\n", "\\n")
|
|
||||||
.replace("\r", "\\r")
|
|
||||||
.replace("\t", "\\t");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private ConfigProperties configProperties;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取 websopy 基础 URL
|
|
||||||
*/
|
|
||||||
private String getWebsopyBaseUrl() {
|
|
||||||
return configProperties.getWebsopyUrl();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 同步单个用户到 websopy
|
* 同步单个用户到 websopy
|
||||||
*
|
*
|
||||||
* @param user 用户信息
|
* @deprecated 已废弃,用户同步现在通过 MQ 自动触发
|
||||||
*/
|
*/
|
||||||
public void syncUserToWebsopy(User user) {
|
@Deprecated
|
||||||
String websopyBaseUrl = getWebsopyBaseUrl();
|
public void syncUserToWebsopy(Object user) {
|
||||||
if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
|
log.debug("UserSyncService.syncUserToWebsopy 已废弃,用户同步通过 MQ 自动触发");
|
||||||
log.warn("websopyUrl 未配置,跳过用户同步: userId={}", user.getUserId());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (user == null || user.getUserId() == null) {
|
|
||||||
log.warn("用户信息为空,跳过同步");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 构建请求体 - 使用手动JSON构建确保字段正确
|
|
||||||
Integer tenantIdValue = user.getTenantId() != null ? user.getTenantId() : 0;
|
|
||||||
|
|
||||||
// 构建JSON字符串,确保tenant_id字段存在且不为null
|
|
||||||
StringBuilder jsonBuilder = new StringBuilder();
|
|
||||||
jsonBuilder.append("{");
|
|
||||||
jsonBuilder.append("\"userId\":").append(user.getUserId()).append(",");
|
|
||||||
jsonBuilder.append("\"username\":\"").append(escapeJson(user.getUsername())).append("\",");
|
|
||||||
jsonBuilder.append("\"nickname\":\"").append(escapeJson(user.getNickname())).append("\",");
|
|
||||||
jsonBuilder.append("\"avatar\":\"").append(escapeJson(user.getAvatar())).append("\",");
|
|
||||||
jsonBuilder.append("\"phone\":\"").append(escapeJson(user.getPhone())).append("\",");
|
|
||||||
jsonBuilder.append("\"status\":").append(user.getStatus()).append(",");
|
|
||||||
jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\",");
|
|
||||||
jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(","); // 驼峰格式
|
|
||||||
jsonBuilder.append("\"tenant_id\":").append(tenantIdValue); // 下划线格式
|
|
||||||
jsonBuilder.append("}");
|
|
||||||
|
|
||||||
String url = websopyBaseUrl + "/api/app/user-sync/single";
|
|
||||||
String body = jsonBuilder.toString();
|
|
||||||
|
|
||||||
log.info("同步用户到 websopy: userId={}, username={}, nickname={}, phone={}, tenantId={}, url={}",
|
|
||||||
user.getUserId(), user.getUsername(), user.getNickname(), user.getPhone(), user.getTenantId(), url);
|
|
||||||
log.info("同步用户请求体JSON: {}", body); // 改为info级别以便查看
|
|
||||||
// 额外日志:tenantId 值检查
|
|
||||||
log.debug("tenantId检查 - 原始值: {}, 转换后值: {}",
|
|
||||||
user.getTenantId(), tenantIdValue);
|
|
||||||
|
|
||||||
// 发送 HTTP POST 请求
|
|
||||||
Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("Content-Type", "application/json");
|
|
||||||
|
|
||||||
HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body);
|
|
||||||
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
log.debug("websopy 响应: {}", responseBody);
|
|
||||||
|
|
||||||
// 解析响应
|
|
||||||
JSONObject result = JSON.parseObject(responseBody);
|
|
||||||
if (result != null && result.getIntValue("code") == 0) {
|
|
||||||
log.info("用户同步成功: userId={}", user.getUserId());
|
|
||||||
} else {
|
|
||||||
String message = result != null ? result.getString("message") : "未知错误";
|
|
||||||
log.error("用户同步失败: userId={}, message={}", user.getUserId(), message);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("用户同步异常: userId={}, error={}", user.getUserId(), e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步用户到 websopy(Map格式,用于MQ消息处理)
|
|
||||||
*
|
|
||||||
* @param data 用户信息Map
|
|
||||||
*/
|
|
||||||
public void syncUserToWebsopy(Map<String, Object> data) {
|
|
||||||
String websopyBaseUrl = getWebsopyBaseUrl();
|
|
||||||
if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
|
|
||||||
log.warn("websopyUrl 未配置,跳过用户同步");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data == null || data.isEmpty()) {
|
|
||||||
log.warn("用户数据为空,跳过同步");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Integer userId = getIntValue(data, "userId");
|
|
||||||
if (userId == null) {
|
|
||||||
log.warn("用户数据中缺少userId,跳过同步");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Integer tenantIdValue = getIntValue(data, "tenantId");
|
|
||||||
if (tenantIdValue == null) {
|
|
||||||
tenantIdValue = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 构建JSON字符串
|
|
||||||
StringBuilder jsonBuilder = new StringBuilder();
|
|
||||||
jsonBuilder.append("{");
|
|
||||||
jsonBuilder.append("\"userId\":").append(userId).append(",");
|
|
||||||
jsonBuilder.append("\"username\":\"").append(escapeJson(getStringValue(data, "username"))).append("\",");
|
|
||||||
jsonBuilder.append("\"nickname\":\"").append(escapeJson(getStringValue(data, "nickname"))).append("\",");
|
|
||||||
jsonBuilder.append("\"avatar\":\"").append(escapeJson(getStringValue(data, "avatar"))).append("\",");
|
|
||||||
jsonBuilder.append("\"phone\":\"").append(escapeJson(getStringValue(data, "phone"))).append("\",");
|
|
||||||
jsonBuilder.append("\"status\":").append(getIntValue(data, "status", 1)).append(",");
|
|
||||||
jsonBuilder.append("\"updateTime\":\"").append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\",");
|
|
||||||
jsonBuilder.append("\"tenantId\":").append(tenantIdValue).append(",");
|
|
||||||
jsonBuilder.append("\"tenant_id\":").append(tenantIdValue);
|
|
||||||
jsonBuilder.append("}");
|
|
||||||
|
|
||||||
String url = websopyBaseUrl + "/api/app/user-sync/single";
|
|
||||||
String body = jsonBuilder.toString();
|
|
||||||
|
|
||||||
log.info("MQ同步用户到 websopy: userId={}, body={}", userId, body);
|
|
||||||
|
|
||||||
Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("Content-Type", "application/json");
|
|
||||||
|
|
||||||
HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, body);
|
|
||||||
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
JSONObject result = JSON.parseObject(responseBody);
|
|
||||||
if (result != null && result.getIntValue("code") == 0) {
|
|
||||||
log.info("MQ用户同步成功: userId={}", userId);
|
|
||||||
} else {
|
|
||||||
String message = result != null ? result.getString("message") : "未知错误";
|
|
||||||
log.error("MQ用户同步失败: userId={}, message={}", userId, message);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("MQ用户同步异常: error={}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 从 websopy 删除用户
|
|
||||||
*
|
|
||||||
* @param data 包含userId的数据
|
|
||||||
*/
|
|
||||||
public void deleteUserFromWebsopy(Map<String, Object> data) {
|
|
||||||
String websopyBaseUrl = getWebsopyBaseUrl();
|
|
||||||
if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
|
|
||||||
log.warn("websopyUrl 未配置,跳过删除用户");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data == null) {
|
|
||||||
log.warn("删除用户数据为空");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Integer userId = getIntValue(data, "userId");
|
|
||||||
if (userId == null) {
|
|
||||||
log.warn("删除用户数据中缺少userId");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String url = websopyBaseUrl + "/api/app/user-sync/delete/" + userId;
|
|
||||||
|
|
||||||
log.info("从 websopy 删除用户: userId={}", userId);
|
|
||||||
|
|
||||||
Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("Content-Type", "application/json");
|
|
||||||
|
|
||||||
HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, "");
|
|
||||||
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
JSONObject result = JSON.parseObject(responseBody);
|
|
||||||
if (result != null && result.getIntValue("code") == 0) {
|
|
||||||
log.info("MQ删除用户成功: userId={}", userId);
|
|
||||||
} else {
|
|
||||||
String message = result != null ? result.getString("message") : "未知错误";
|
|
||||||
log.error("MQ删除用户失败: userId={}, message={}", userId, message);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("MQ删除用户异常: error={}", e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getStringValue(Map<String, Object> data, String key) {
|
|
||||||
Object value = data.get(key);
|
|
||||||
return value != null ? String.valueOf(value) : "";
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer getIntValue(Map<String, Object> data, String key) {
|
|
||||||
return getIntValue(data, key, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Integer getIntValue(Map<String, Object> data, String key, Integer defaultValue) {
|
|
||||||
Object value = data.get(key);
|
|
||||||
if (value == null) {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
if (value instanceof Integer) {
|
|
||||||
return (Integer) value;
|
|
||||||
}
|
|
||||||
if (value instanceof Number) {
|
|
||||||
return ((Number) value).intValue();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return Integer.parseInt(String.valueOf(value));
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 刷新 websopy 端的用户缓存
|
* 刷新 websopy 端的用户缓存
|
||||||
* 只传 userId,websopy 端会通过 API 回查获取完整信息
|
*
|
||||||
*
|
* @deprecated 已废弃
|
||||||
* @param userId 用户ID
|
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void refreshUserCache(Integer userId) {
|
public void refreshUserCache(Integer userId) {
|
||||||
String websopyBaseUrl = getWebsopyBaseUrl();
|
log.debug("UserSyncService.refreshUserCache 已废弃");
|
||||||
if (websopyBaseUrl == null || websopyBaseUrl.isEmpty()) {
|
|
||||||
log.warn("websopyUrl 未配置,跳过刷新缓存: userId={}", userId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (userId == null) {
|
|
||||||
log.warn("userId 为空,跳过刷新");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
String url = websopyBaseUrl + "/api/app/user-sync/refresh/" + userId;
|
|
||||||
|
|
||||||
log.info("刷新用户缓存: userId={}, url={}", userId, url);
|
|
||||||
|
|
||||||
Map<String, String> headers = new HashMap<>();
|
|
||||||
headers.put("Content-Type", "application/json");
|
|
||||||
|
|
||||||
HttpResponse response = HttpUtils.doPost(url, "", "POST", headers, null, "");
|
|
||||||
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
JSONObject result = JSON.parseObject(responseBody);
|
|
||||||
if (result != null && result.getIntValue("code") == 0) {
|
|
||||||
log.info("刷新缓存成功: userId={}", userId);
|
|
||||||
} else {
|
|
||||||
String message = result != null ? result.getString("message") : "未知错误";
|
|
||||||
log.error("刷新缓存失败: userId={}, message={}", userId, message);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("刷新缓存异常: userId={}, error={}", userId, e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,10 @@ import com.gxwebsoft.common.system.entity.*;
|
|||||||
import com.gxwebsoft.common.system.mapper.UserMapper;
|
import com.gxwebsoft.common.system.mapper.UserMapper;
|
||||||
import com.gxwebsoft.common.system.param.LoginParam;
|
import com.gxwebsoft.common.system.param.LoginParam;
|
||||||
import com.gxwebsoft.common.system.param.UserParam;
|
import com.gxwebsoft.common.system.param.UserParam;
|
||||||
|
import com.gxwebsoft.common.mq.producer.SyncMessageProducer;
|
||||||
import com.gxwebsoft.common.system.service.*;
|
import com.gxwebsoft.common.system.service.*;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.security.core.userdetails.UserDetails;
|
import org.springframework.security.core.userdetails.UserDetails;
|
||||||
import org.springframework.security.core.userdetails.UsernameNotFoundException;
|
import org.springframework.security.core.userdetails.UsernameNotFoundException;
|
||||||
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
|
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
|
||||||
@@ -34,6 +37,7 @@ import static com.gxwebsoft.common.core.constants.PlatformConstants.WEB;
|
|||||||
* @author WebSoft
|
* @author WebSoft
|
||||||
* @since 2018-12-24 16:10:14
|
* @since 2018-12-24 16:10:14
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
|
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
|
||||||
@Resource
|
@Resource
|
||||||
@@ -51,6 +55,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
|
|||||||
@Resource
|
@Resource
|
||||||
private UserRefereeService userRefereeService;
|
private UserRefereeService userRefereeService;
|
||||||
|
|
||||||
|
@Autowired(required = false)
|
||||||
|
private SyncMessageProducer syncMessageProducer;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageResult<User> pageRel(UserParam param) {
|
public PageResult<User> pageRel(UserParam param) {
|
||||||
PageParam<User, UserParam> page = new PageParam<>(param);
|
PageParam<User, UserParam> page = new PageParam<>(param);
|
||||||
@@ -130,6 +137,14 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
|
|||||||
throw new BusinessException("用户角色添加失败");
|
throw new BusinessException("用户角色添加失败");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 用户创建成功后,通过MQ异步同步用户数据到 websopy
|
||||||
|
if (result && syncMessageProducer != null) {
|
||||||
|
User savedUser = getAllByUserId(String.valueOf(user.getUserId()));
|
||||||
|
if (savedUser != null) {
|
||||||
|
syncMessageProducer.sendUserSyncMessage("websopy", "CREATE", savedUser);
|
||||||
|
log.info("用户创建后发送MQ消息同步到websopy: userId={}, phone={}", user.getUserId(), user.getPhone());
|
||||||
|
}
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,6 +178,14 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
|
|||||||
throw new BusinessException("用户角色添加失败");
|
throw new BusinessException("用户角色添加失败");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// 用户更新成功后,通过MQ异步同步用户数据到 websopy
|
||||||
|
if (result && syncMessageProducer != null) {
|
||||||
|
User updatedUser = getAllByUserId(String.valueOf(user.getUserId()));
|
||||||
|
if (updatedUser != null) {
|
||||||
|
syncMessageProducer.sendUserSyncMessage("websopy", "UPDATE", updatedUser);
|
||||||
|
log.info("用户更新后发送MQ消息同步到websopy: userId={}, phone={}", user.getUserId(), user.getPhone());
|
||||||
|
}
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -323,6 +346,7 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
|
|||||||
// Ensure caller (e.g. register / invite register) gets non-empty roles/authorities in response.
|
// Ensure caller (e.g. register / invite register) gets non-empty roles/authorities in response.
|
||||||
addUser.setRoles(userRoleService.listByUserId(addUser.getUserId()));
|
addUser.setRoles(userRoleService.listByUserId(addUser.getUserId()));
|
||||||
addUser.setAuthorities(roleMenuService.listMenuByUserId(addUser.getUserId(), null));
|
addUser.setAuthorities(roleMenuService.listMenuByUserId(addUser.getUserId(), null));
|
||||||
|
// addUser内部调用saveUser,saveUser已发送MQ消息,这里不需要重复发送
|
||||||
return addUser;
|
return addUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user