- 将用户同步消息交换机由 DirectExchange 改为 TopicExchange,实现多目标系统路由 - 去除核心系统中用户同步队列定义和消费者,转由各子系统独立消费 - RabbitMQSyncProducer 动态构建 routing key,支持按目标系统路由同步消息 - UserServiceImpl 用户新增和更新后发送同步消息到 websopy 的 MQ 交换机 - 废弃 UserSyncService 中的 HTTP 同步接口,改为通过 MQ 实现用户数据同步 - 删除 UserSyncConsumer,核心系统不再直接消费用户同步消息 - 增加日志输出,便于追踪用户同步消息发送情况 - 保留废弃代码兼容旧引用,方便后续平滑迁移和维护
3.6 KiB
3.6 KiB
2026-04-09 工作记录
UserServiceImpl 添加 MQ 同步功能
在 UserServiceImpl 中统一添加了用户数据变更时的 MQ 消息发送逻辑:
修改内容
- 添加依赖注入:注入
SyncMessageProducer,使用@Autowired(required = false)避免 MQ 未启用时报错 - saveUser() 方法:用户创建成功后发送
CREATE事件到 websopy - updateUser() 方法:用户更新成功后发送
UPDATE事件到 websopy - addUser() 方法:注释说明已通过 saveUser 发送 MQ,避免重复
触发场景
现在以下操作都会触发 MQ 同步:
- 用户注册(短信登录自动注册、普通注册、管理员注册)
- 后台添加用户
- 用户更新信息
- 扫码绑定手机号(原有逻辑)
日志输出
- 创建:
用户创建后发送MQ消息同步到websopy: userId={}, phone={} - 更新:
用户更新后发送MQ消息同步到websopy: userId={}, phone={}
MQ 架构改造 - 方案 A 实现
将用户同步架构从 "core 系统消费后 HTTP 转发" 改为 "各子系统直接消费 MQ"。
core 系统改动 (com.gxwebsoft.core)
1. RabbitMQConfig.java
- Exchange 类型:从
DirectExchange改为TopicExchange - 删除队列定义:core 系统不再定义消费队列,只负责发送
- Routing Key 格式:
user.sync.{targetSystem}
2. RabbitMQSyncProducer.java
- 动态 Routing Key:根据
targetSystem构建 routing key - 代码:
"user.sync." + targetSystem.toLowerCase()
3. 删除 UserSyncConsumer.java
- core 系统不再消费用户同步消息
4. UserSyncService.java
- 废弃 HTTP 同步逻辑:所有 HTTP 调用方法已删除
- 标记 @Deprecated:保留空实现以兼容旧代码
- 同步方式:现在统一通过 MQ 自动触发
websopy 系统改动 (websopy-java)
1. RabbitMQConfig.java
// websopy 专用队列
public static final String SYNC_QUEUE_WEBSOPY = "user.sync.websopy.queue";
public static final String SYNC_ROUTING_KEY_WEBSOPY = "user.sync.websopy";
// Topic Exchange(与 core 系统共用)
@Bean
public TopicExchange syncExchange() {
return new TopicExchange(SYNC_EXCHANGE, true, false);
}
// websopy 专用队列绑定
@Bean
public Binding syncBindingWebsopy() {
return BindingBuilder.bind(syncQueueWebsopy())
.to(syncExchange())
.with(SYNC_ROUTING_KEY_WEBSOPY);
}
2. SyncMessageConsumer.java
- 监听队列:改为
SYNC_QUEUE_WEBSOPY(user.sync.websopy.queue) - 死信队列:改为
DLQ_QUEUE_WEBSOPY(user.sync.websopy.dlq)
新架构流程
用户操作 → UserServiceImpl.saveUser/updateUser
↓
MQ Producer (routing key = user.sync.websopy)
↓
Topic Exchange (sync.exchange)
↓
┌───────────┼───────────┐
↓ ↓ ↓
websopy 子系统B 子系统C
(消费者) (消费者) (消费者)
消息格式
{
"messageId": "uuid",
"messageType": "USER_SYNC",
"eventType": "CREATE|UPDATE|DELETE",
"targetSystem": "websopy",
"data": {
"userId": 123,
"username": "xxx",
"nickname": "xxx",
"avatar": "xxx",
"phone": "xxx",
"status": 1,
"tenantId": 1
},
"createTime": "2026-04-09T16:00:00",
"retryCount": 0
}
优势
- 解耦:core 系统只负责发送,不关心哪些子系统消费
- 扩展性:新增子系统只需添加自己的消费者,无需修改 core
- 可靠性:各子系统独立消费,互不影响
- 符合 MQ 设计:消息广播到多个订阅者