7.2 KiB
7.2 KiB
流式输出(SSE)接入
使用 Server-Sent Events 实现 AI 流式响应,提升用户体验。
🌊 什么是流式输出?
流式输出(Streaming)是一种数据传输方式,服务器将数据「分块」发送给客户端,而不是等待全部完成后再返回。
对比
| 方式 | 等待时间 | 体验 | 适用场景 |
|---|---|---|---|
| 普通请求 | 等待完整响应 | 一般 | 简单、快速的请求 |
| 流式输出 | 实时看到生成过程 | ⭐ 优秀 | AI 生成、长文本 |
🚀 开始使用
前端:接收流式响应
import { WebsopyClient } from '@websopy/sdk'
const client = new WebsopyClient({
apiKey: process.env.WEBSOPY_API_KEY
})
async function streamChat() {
const stream = await client.ai.chatStream({
messages: [
{ role: 'user', content: '写一篇关于 AI 的文章' }
]
})
// 方法 1:遍历数据块
for await (const chunk of stream) {
if (chunk.type === 'content') {
process.stdout.write(chunk.content)
}
}
// 或方法 2:事件监听
// stream.on('data', (chunk) => { ... })
// stream.on('end', () => { ... })
}
响应数据结构
interface StreamChunk {
type: 'content' | 'tool_call' | 'done' | 'error'
content?: string
tool?: {
name: string
arguments: string
}
usage?: {
promptTokens: number
completionTokens: number
}
}
// 示例数据块
{ type: 'content', content: 'AI' }
{ type: 'content', content: ' 正在' }
{ type: 'content', content: '改变' }
{ type: 'content', content: '世界...' }
{ type: 'done', usage: { promptTokens: 20, completionTokens: 150 } }
🌐 原生 SSE 实现
如果不用 SDK,直接使用 Fetch API:
GET 请求
async function nativeStream() {
const response = await fetch(
'https://api.websopy.com/v1/ai/chat?message=Hello',
{
headers: {
'Authorization': `Bearer ${apiKey}`,
'Accept': 'text/event-stream'
}
}
)
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
// 解析 SSE 数据
const lines = chunk.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6))
console.log('收到:', data)
}
}
}
}
POST 请求
async function streamPost() {
const response = await fetch(
'https://api.websopy.com/v1/ai/chat/stream',
{
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
'Accept': 'text/event-stream'
},
body: JSON.stringify({
messages: [
{ role: 'user', content: '解释量子计算' }
],
model: 'gpt-4',
stream: true
})
}
)
// 处理流...
}
🎨 前端组件示例
React Hook
import { useState } from 'react'
function useStreamingChat() {
const [messages, setMessages] = useState([])
const [streaming, setStreaming] = useState(false)
const [currentContent, setCurrentContent] = useState('')
const sendMessage = async (content) => {
setStreaming(true)
setCurrentContent('')
// 添加用户消息
setMessages(prev => [...prev, { role: 'user', content }])
const stream = await client.ai.chatStream({
messages: [...messages, { role: 'user', content }]
})
for await (const chunk of stream) {
if (chunk.type === 'content') {
setCurrentContent(prev => prev + chunk.content)
}
}
// 保存完整消息
setMessages(prev => [...prev, { role: 'assistant', content: currentContent }])
setStreaming(false)
}
return { messages, streaming, currentContent, sendMessage }
}
Vue Composition API
import { ref } from 'vue'
export function useStreamingChat() {
const messages = ref([])
const streaming = ref(false)
const currentContent = ref('')
const sendMessage = async (content) => {
streaming.value = true
currentContent.value = ''
messages.value.push({ role: 'user', content })
const stream = await client.ai.chatStream({
messages: messages.value
})
for await (const chunk of stream) {
if (chunk.type === 'content') {
currentContent.value += chunk.content
}
}
messages.value.push({ role: 'assistant', content: currentContent.value })
streaming.value = false
}
return { messages, streaming, currentContent, sendMessage }
}
⚙️ 服务端 SSE 配置
Node.js Express
import express from 'express'
app.post('/api/chat/stream', async (req, res) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
// 保持连接
res.flushHeaders()
try {
const stream = await client.ai.chatStream(req.body)
for await (const chunk of stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`)
}
res.write('data: [DONE]\n\n')
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`)
}
res.end()
})
Next.js API Route
// app/api/chat/route.ts
export async function POST(req: Request) {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
try {
const stream = await client.ai.chatStream(await req.json())
for await (const chunk of stream) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`)
)
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
} catch (error) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ error: error.message })}\n\n`)
)
}
controller.close()
}
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
})
}
❓ 常见问题
Q: 流式中断怎么办?
async function streamWithReconnect() {
const maxRetries = 3
let retries = 0
while (retries < maxRetries) {
try {
const stream = await client.ai.chatStream({...})
return stream
} catch (error) {
retries++
await sleep(1000 * retries) // 指数退避
}
}
}
Q: 如何在流式过程中取消请求?
const controller = new AbortController()
// 发送请求
const stream = await client.ai.chatStream({
messages: [...],
signal: controller.signal
})
// 取消请求
controller.abort()
Q: SSE 和 WebSocket 区别?
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务端→客户端) | 双向 |
| 协议 | HTTP | ws:// |
| 重连 | 自动 | 需手动处理 |
| 复杂度 | 简单 | 复杂 |
| 适用 | AI 流式输出 | 实时聊天 |
上一步: API Key 创建与管理
下一步: Webhook 事件接入