Files
core/.workbuddy/memory/2026-04-09.md
赵忠林 0343979b5d feat(mq): 重构用户同步为MQ消息驱动并移除HTTP同步服务
- 将用户同步消息交换机由 DirectExchange 改为 TopicExchange,实现多目标系统路由
- 去除核心系统中用户同步队列定义和消费者,转由各子系统独立消费
- RabbitMQSyncProducer 动态构建 routing key,支持按目标系统路由同步消息
- UserServiceImpl 用户新增和更新后发送同步消息到 websopy 的 MQ 交换机
- 废弃 UserSyncService 中的 HTTP 同步接口,改为通过 MQ 实现用户数据同步
- 删除 UserSyncConsumer,核心系统不再直接消费用户同步消息
- 增加日志输出,便于追踪用户同步消息发送情况
- 保留废弃代码兼容旧引用,方便后续平滑迁移和维护
2026-04-09 16:12:05 +08:00

3.6 KiB
Raw Permalink Blame History

2026-04-09 工作记录

UserServiceImpl 添加 MQ 同步功能

UserServiceImpl 中统一添加了用户数据变更时的 MQ 消息发送逻辑:

修改内容

  1. 添加依赖注入:注入 SyncMessageProducer,使用 @Autowired(required = false) 避免 MQ 未启用时报错
  2. saveUser() 方法:用户创建成功后发送 CREATE 事件到 websopy
  3. updateUser() 方法:用户更新成功后发送 UPDATE 事件到 websopy
  4. addUser() 方法:注释说明已通过 saveUser 发送 MQ避免重复

触发场景

现在以下操作都会触发 MQ 同步:

  • 用户注册(短信登录自动注册、普通注册、管理员注册)
  • 后台添加用户
  • 用户更新信息
  • 扫码绑定手机号(原有逻辑)

日志输出

  • 创建:用户创建后发送MQ消息同步到websopy: userId={}, phone={}
  • 更新:用户更新后发送MQ消息同步到websopy: userId={}, phone={}

MQ 架构改造 - 方案 A 实现

将用户同步架构从 "core 系统消费后 HTTP 转发" 改为 "各子系统直接消费 MQ"。

core 系统改动 (com.gxwebsoft.core)

1. RabbitMQConfig.java

  • Exchange 类型:从 DirectExchange 改为 TopicExchange
  • 删除队列定义core 系统不再定义消费队列,只负责发送
  • Routing Key 格式user.sync.{targetSystem}

2. RabbitMQSyncProducer.java

  • 动态 Routing Key:根据 targetSystem 构建 routing key
  • 代码"user.sync." + targetSystem.toLowerCase()

3. 删除 UserSyncConsumer.java

  • core 系统不再消费用户同步消息

4. UserSyncService.java

  • 废弃 HTTP 同步逻辑:所有 HTTP 调用方法已删除
  • 标记 @Deprecated:保留空实现以兼容旧代码
  • 同步方式:现在统一通过 MQ 自动触发

websopy 系统改动 (websopy-java)

1. RabbitMQConfig.java

// websopy 专用队列
public static final String SYNC_QUEUE_WEBSOPY = "user.sync.websopy.queue";
public static final String SYNC_ROUTING_KEY_WEBSOPY = "user.sync.websopy";

// Topic Exchange与 core 系统共用)
@Bean
public TopicExchange syncExchange() {
    return new TopicExchange(SYNC_EXCHANGE, true, false);
}

// websopy 专用队列绑定
@Bean
public Binding syncBindingWebsopy() {
    return BindingBuilder.bind(syncQueueWebsopy())
            .to(syncExchange())
            .with(SYNC_ROUTING_KEY_WEBSOPY);
}

2. SyncMessageConsumer.java

  • 监听队列:改为 SYNC_QUEUE_WEBSOPY (user.sync.websopy.queue)
  • 死信队列:改为 DLQ_QUEUE_WEBSOPY (user.sync.websopy.dlq)

新架构流程

用户操作 → UserServiceImpl.saveUser/updateUser 
                ↓
         MQ Producer (routing key = user.sync.websopy)
                ↓
         Topic Exchange (sync.exchange)
                ↓
    ┌───────────┼───────────┐
    ↓           ↓           ↓
websopy      子系统B     子系统C
(消费者)     (消费者)     (消费者)

消息格式

{
  "messageId": "uuid",
  "messageType": "USER_SYNC",
  "eventType": "CREATE|UPDATE|DELETE",
  "targetSystem": "websopy",
  "data": {
    "userId": 123,
    "username": "xxx",
    "nickname": "xxx",
    "avatar": "xxx",
    "phone": "xxx",
    "status": 1,
    "tenantId": 1
  },
  "createTime": "2026-04-09T16:00:00",
  "retryCount": 0
}

优势

  • 解耦core 系统只负责发送,不关心哪些子系统消费
  • 扩展性:新增子系统只需添加自己的消费者,无需修改 core
  • 可靠性:各子系统独立消费,互不影响
  • 符合 MQ 设计:消息广播到多个订阅者