6.2 KiB
6.2 KiB
AI 工作流配置
使用 OpenClaw 工作流引擎,构建定时任务与自动化业务流。
🔄 什么是工作流?
工作流(Workflow)是预定义的一系列自动化步骤,可以:
- ⏰ 定时执行:每天早上发送报表
- 🔗 事件触发:用户注册后自动发送欢迎邮件
- 🔁 条件分支:根据条件执行不同操作
- 📊 数据处理:批量处理数据
🏗️ 核心概念
| 概念 | 说明 |
|---|---|
| 触发器 (Trigger) | 工作流的启动条件 |
| 节点 (Node) | 工作流中的单个步骤 |
| 连接 (Edge) | 节点之间的数据流向 |
| 变量 (Variable) | 存储和传递数据 |
| 执行日志 | 每次运行的记录 |
🚀 创建第一个工作流
方式一:代码定义
import { WorkflowBuilder } from '@websopy/ai-sdk'
const workflow = new WorkflowBuilder({
name: '每日数据报告',
description: '每天早上 9 点生成数据报告'
})
// 添加触发器:定时执行
workflow.addTrigger({
type: 'schedule',
config: {
cron: '0 9 * * *', // 每天 9:00
timezone: 'Asia/Shanghai'
}
})
// 添加节点:获取数据
workflow.addNode('fetch-data', {
type: 'http-request',
config: {
url: 'https://api.analytics.com/daily-stats',
method: 'GET'
}
})
// 添加节点:生成报告
workflow.addNode('generate-report', {
type: 'ai-agent',
config: {
prompt: '根据以下数据生成日报:{{fetch-data.output}}'
}
})
// 添加节点:发送邮件
workflow.addNode('send-email', {
type: 'email',
config: {
to: ['team@company.com'],
subject: '📊 每日数据报告',
body: '{{generate-report.output}}'
}
})
// 节点连接
workflow.connect('fetch-data', 'generate-report')
workflow.connect('generate-report', 'send-email')
// 保存
const created = await client.workflow.create(workflow.toJSON())
console.log('工作流 ID:', created.id)
方式二:可视化编辑器
在控制台 → AI 功能 → 工作流,点击「新建工作流」打开可视化编辑器。
📦 节点类型
1. HTTP 请求
workflow.addNode('call-api', {
type: 'http-request',
config: {
url: 'https://api.example.com/data',
method: 'POST',
headers: {
'Authorization': 'Bearer xxx'
},
body: {
query: 'SELECT * FROM orders'
}
}
})
2. AI 智能体
workflow.addNode('analyze', {
type: 'ai-agent',
config: {
model: 'gpt-4',
prompt: '分析以下数据,找出异常:{{input}}',
temperature: 0.5
}
})
3. 条件分支
workflow.addNode('check-status', {
type: 'condition',
config: {
conditions: [
{
expression: '{{input.amount}} > 10000',
nextNode: 'high-value-handler'
},
{
expression: '{{input.amount}} > 1000',
nextNode: 'medium-handler'
}
],
default: 'low-value-handler'
}
})
4. 循环
workflow.addNode('process-items', {
type: 'loop',
config: {
items: '{{batch-data.items}}',
maxIterations: 100,
// 循环体节点
nodes: [...]
}
})
5. 等待/延迟
workflow.addNode('delay', {
type: 'delay',
config: {
duration: '24h' // 或 3600000(毫秒)
}
})
6. Webhook
workflow.addNode('notify', {
type: 'webhook',
config: {
url: 'https://your-server.com/webhook',
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: {
event: 'workflow.completed',
data: '{{workflow-output}}'
}
}
})
🔔 触发器类型
定时触发
workflow.addTrigger({
type: 'schedule',
config: {
cron: '0 */4 * * *', // 每 4 小时
// 或使用简单表达式
interval: 'daily', // daily, weekly, monthly
time: '09:00'
}
})
Webhook 触发
workflow.addTrigger({
type: 'webhook',
config: {
// 创建后获得 webhook URL
}
})
事件触发
workflow.addTrigger({
type: 'event',
config: {
events: [
'user.created',
'order.completed',
'ai.task.failed'
]
}
})
🔧 变量与数据处理
定义变量
workflow.addNode('process', {
type: 'transform',
config: {
variables: {
totalAmount: '{{input.orders}} | sum("amount")',
avgScore: '{{input.reviews}} | avg("score")',
userNames: '{{input.users}} | map("name")'
}
}
})
数据转换
workflow.addNode('transform', {
type: 'transform',
config: {
// 过滤器
filter: '{{data}} | filter(item => item.active == true)',
// 映射
map: '{{data}} | map(item => ({ id: item.id, name: item.name }))',
// 排序
sort: '{{data}} | sortBy("created_at", "desc")'
}
})
📊 执行与监控
手动执行
// 立即执行一次
const run = await client.workflow.run(workflowId, {
input: {
date: '2024-01-15'
}
})
console.log('执行 ID:', run.id)
查看执行日志
const logs = await client.workflow.getRunLogs(run.id)
for (const log of logs) {
console.log(`[${log.timestamp}] ${log.node}: ${log.status}`)
if (log.error) {
console.log(' 错误:', log.error)
}
}
执行状态
const status = await client.workflow.getRunStatus(run.id)
console.log('状态:', status.status) // running, completed, failed
console.log('当前节点:', status.currentNode)
console.log('已用时间:', status.elapsedTime)
❓ 常见问题
Q: 工作流执行失败怎么办?
- 查看执行日志定位问题节点
- 检查输入数据格式
- 查看节点配置是否正确
- 联系支持时提供执行 ID
Q: 如何调试工作流?
使用「测试执行」功能,传入测试数据,逐步查看每个节点的输出。
Q: 可以嵌套工作流吗?
是的,可以在一个工作流中调用另一个工作流:
workflow.addNode('sub-workflow', {
type: 'workflow',
config: {
workflowId: 'wf-xxx',
input: '{{current-data}}'
}
})
上一步: RAG 知识库搭建
下一步: Docker Compose 部署