Files
tiantian-system/public/docs/workflow.md
2026-04-08 17:10:58 +08:00

6.2 KiB
Raw Permalink Blame History

AI 工作流配置

使用 OpenClaw 工作流引擎,构建定时任务与自动化业务流。

🔄 什么是工作流?

工作流Workflow是预定义的一系列自动化步骤可以

  • 定时执行:每天早上发送报表
  • 🔗 事件触发:用户注册后自动发送欢迎邮件
  • 🔁 条件分支:根据条件执行不同操作
  • 📊 数据处理:批量处理数据

🏗️ 核心概念

概念 说明
触发器 (Trigger) 工作流的启动条件
节点 (Node) 工作流中的单个步骤
连接 (Edge) 节点之间的数据流向
变量 (Variable) 存储和传递数据
执行日志 每次运行的记录

🚀 创建第一个工作流

方式一:代码定义

import { WorkflowBuilder } from '@websopy/ai-sdk'

const workflow = new WorkflowBuilder({
  name: '每日数据报告',
  description: '每天早上 9 点生成数据报告'
})

// 添加触发器:定时执行
workflow.addTrigger({
  type: 'schedule',
  config: {
    cron: '0 9 * * *',  // 每天 9:00
    timezone: 'Asia/Shanghai'
  }
})

// 添加节点:获取数据
workflow.addNode('fetch-data', {
  type: 'http-request',
  config: {
    url: 'https://api.analytics.com/daily-stats',
    method: 'GET'
  }
})

// 添加节点:生成报告
workflow.addNode('generate-report', {
  type: 'ai-agent',
  config: {
    prompt: '根据以下数据生成日报:{{fetch-data.output}}'
  }
})

// 添加节点:发送邮件
workflow.addNode('send-email', {
  type: 'email',
  config: {
    to: ['team@company.com'],
    subject: '📊 每日数据报告',
    body: '{{generate-report.output}}'
  }
})

// 节点连接
workflow.connect('fetch-data', 'generate-report')
workflow.connect('generate-report', 'send-email')

// 保存
const created = await client.workflow.create(workflow.toJSON())
console.log('工作流 ID:', created.id)

方式二:可视化编辑器

在控制台 → AI 功能 → 工作流,点击「新建工作流」打开可视化编辑器。

📦 节点类型

1. HTTP 请求

workflow.addNode('call-api', {
  type: 'http-request',
  config: {
    url: 'https://api.example.com/data',
    method: 'POST',
    headers: {
      'Authorization': 'Bearer xxx'
    },
    body: {
      query: 'SELECT * FROM orders'
    }
  }
})

2. AI 智能体

workflow.addNode('analyze', {
  type: 'ai-agent',
  config: {
    model: 'gpt-4',
    prompt: '分析以下数据,找出异常:{{input}}',
    temperature: 0.5
  }
})

3. 条件分支

workflow.addNode('check-status', {
  type: 'condition',
  config: {
    conditions: [
      {
        expression: '{{input.amount}} > 10000',
        nextNode: 'high-value-handler'
      },
      {
        expression: '{{input.amount}} > 1000',
        nextNode: 'medium-handler'
      }
    ],
    default: 'low-value-handler'
  }
})

4. 循环

workflow.addNode('process-items', {
  type: 'loop',
  config: {
    items: '{{batch-data.items}}',
    maxIterations: 100,
    // 循环体节点
    nodes: [...]
  }
})

5. 等待/延迟

workflow.addNode('delay', {
  type: 'delay',
  config: {
    duration: '24h'  // 或 3600000毫秒
  }
})

6. Webhook

workflow.addNode('notify', {
  type: 'webhook',
  config: {
    url: 'https://your-server.com/webhook',
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: {
      event: 'workflow.completed',
      data: '{{workflow-output}}'
    }
  }
})

🔔 触发器类型

定时触发

workflow.addTrigger({
  type: 'schedule',
  config: {
    cron: '0 */4 * * *',  // 每 4 小时
    // 或使用简单表达式
    interval: 'daily',  // daily, weekly, monthly
    time: '09:00'
  }
})

Webhook 触发

workflow.addTrigger({
  type: 'webhook',
  config: {
    // 创建后获得 webhook URL
  }
})

事件触发

workflow.addTrigger({
  type: 'event',
  config: {
    events: [
      'user.created',
      'order.completed',
      'ai.task.failed'
    ]
  }
})

🔧 变量与数据处理

定义变量

workflow.addNode('process', {
  type: 'transform',
  config: {
    variables: {
      totalAmount: '{{input.orders}} | sum("amount")',
      avgScore: '{{input.reviews}} | avg("score")',
      userNames: '{{input.users}} | map("name")'
    }
  }
})

数据转换

workflow.addNode('transform', {
  type: 'transform',
  config: {
    // 过滤器
    filter: '{{data}} | filter(item => item.active == true)',
    // 映射
    map: '{{data}} | map(item => ({ id: item.id, name: item.name }))',
    // 排序
    sort: '{{data}} | sortBy("created_at", "desc")'
  }
})

📊 执行与监控

手动执行

// 立即执行一次
const run = await client.workflow.run(workflowId, {
  input: {
    date: '2024-01-15'
  }
})

console.log('执行 ID:', run.id)

查看执行日志

const logs = await client.workflow.getRunLogs(run.id)

for (const log of logs) {
  console.log(`[${log.timestamp}] ${log.node}: ${log.status}`)
  if (log.error) {
    console.log('  错误:', log.error)
  }
}

执行状态

const status = await client.workflow.getRunStatus(run.id)

console.log('状态:', status.status)  // running, completed, failed
console.log('当前节点:', status.currentNode)
console.log('已用时间:', status.elapsedTime)

常见问题

Q: 工作流执行失败怎么办?

  1. 查看执行日志定位问题节点
  2. 检查输入数据格式
  3. 查看节点配置是否正确
  4. 联系支持时提供执行 ID

Q: 如何调试工作流?

使用「测试执行」功能,传入测试数据,逐步查看每个节点的输出。

Q: 可以嵌套工作流吗?

是的,可以在一个工作流中调用另一个工作流:

workflow.addNode('sub-workflow', {
  type: 'workflow',
  config: {
    workflowId: 'wf-xxx',
    input: '{{current-data}}'
  }
})

上一步: RAG 知识库搭建
下一步: Docker Compose 部署