Files
jczxw-pc/public/docs/streaming.md
2026-04-23 16:30:57 +08:00

7.2 KiB
Raw Blame History

流式输出SSE接入

使用 Server-Sent Events 实现 AI 流式响应,提升用户体验。

🌊 什么是流式输出?

流式输出Streaming是一种数据传输方式服务器将数据「分块」发送给客户端而不是等待全部完成后再返回。

对比

方式 等待时间 体验 适用场景
普通请求 等待完整响应 一般 简单、快速的请求
流式输出 实时看到生成过程 优秀 AI 生成、长文本

🚀 开始使用

前端:接收流式响应

import { WebsopyClient } from '@websopy/sdk'

const client = new WebsopyClient({
  apiKey: process.env.WEBSOPY_API_KEY
})

async function streamChat() {
  const stream = await client.ai.chatStream({
    messages: [
      { role: 'user', content: '写一篇关于 AI 的文章' }
    ]
  })

  // 方法 1遍历数据块
  for await (const chunk of stream) {
    if (chunk.type === 'content') {
      process.stdout.write(chunk.content)
    }
  }
  
  // 或方法 2事件监听
  // stream.on('data', (chunk) => { ... })
  // stream.on('end', () => { ... })
}

响应数据结构

interface StreamChunk {
  type: 'content' | 'tool_call' | 'done' | 'error'
  content?: string
  tool?: {
    name: string
    arguments: string
  }
  usage?: {
    promptTokens: number
    completionTokens: number
  }
}

// 示例数据块
{ type: 'content', content: 'AI' }
{ type: 'content', content: ' 正在' }
{ type: 'content', content: '改变' }
{ type: 'content', content: '世界...' }
{ type: 'done', usage: { promptTokens: 20, completionTokens: 150 } }

🌐 原生 SSE 实现

如果不用 SDK直接使用 Fetch API

GET 请求

async function nativeStream() {
  const response = await fetch(
    'https://api.websopy.com/v1/ai/chat?message=Hello',
    {
      headers: {
        'Authorization': `Bearer ${apiKey}`,
        'Accept': 'text/event-stream'
      }
    }
  )

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const { done, value } = await reader.read()
    if (done) break
    
    const chunk = decoder.decode(value)
    // 解析 SSE 数据
    const lines = chunk.split('\n')
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6))
        console.log('收到:', data)
      }
    }
  }
}

POST 请求

async function streamPost() {
  const response = await fetch(
    'https://api.websopy.com/v1/ai/chat/stream',
    {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${apiKey}`,
        'Content-Type': 'application/json',
        'Accept': 'text/event-stream'
      },
      body: JSON.stringify({
        messages: [
          { role: 'user', content: '解释量子计算' }
        ],
        model: 'gpt-4',
        stream: true
      })
    }
  )

  // 处理流...
}

🎨 前端组件示例

React Hook

import { useState } from 'react'

function useStreamingChat() {
  const [messages, setMessages] = useState([])
  const [streaming, setStreaming] = useState(false)
  const [currentContent, setCurrentContent] = useState('')

  const sendMessage = async (content) => {
    setStreaming(true)
    setCurrentContent('')
    
    // 添加用户消息
    setMessages(prev => [...prev, { role: 'user', content }])
    
    const stream = await client.ai.chatStream({
      messages: [...messages, { role: 'user', content }]
    })

    for await (const chunk of stream) {
      if (chunk.type === 'content') {
        setCurrentContent(prev => prev + chunk.content)
      }
    }
    
    // 保存完整消息
    setMessages(prev => [...prev, { role: 'assistant', content: currentContent }])
    setStreaming(false)
  }

  return { messages, streaming, currentContent, sendMessage }
}

Vue Composition API

import { ref } from 'vue'

export function useStreamingChat() {
  const messages = ref([])
  const streaming = ref(false)
  const currentContent = ref('')

  const sendMessage = async (content) => {
    streaming.value = true
    currentContent.value = ''
    
    messages.value.push({ role: 'user', content })
    
    const stream = await client.ai.chatStream({
      messages: messages.value
    })

    for await (const chunk of stream) {
      if (chunk.type === 'content') {
        currentContent.value += chunk.content
      }
    }
    
    messages.value.push({ role: 'assistant', content: currentContent.value })
    streaming.value = false
  }

  return { messages, streaming, currentContent, sendMessage }
}

⚙️ 服务端 SSE 配置

Node.js Express

import express from 'express'

app.post('/api/chat/stream', async (req, res) => {
  // 设置 SSE 响应头
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
  
  // 保持连接
  res.flushHeaders()

  try {
    const stream = await client.ai.chatStream(req.body)
    
    for await (const chunk of stream) {
      res.write(`data: ${JSON.stringify(chunk)}\n\n`)
    }
    
    res.write('data: [DONE]\n\n')
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`)
  }
  
  res.end()
})

Next.js API Route

// app/api/chat/route.ts
export async function POST(req: Request) {
  const encoder = new TextEncoder()
  const stream = new ReadableStream({
    async start(controller) {
      try {
        const stream = await client.ai.chatStream(await req.json())
        
        for await (const chunk of stream) {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`)
          )
        }
        
        controller.enqueue(encoder.encode('data: [DONE]\n\n'))
      } catch (error) {
        controller.enqueue(
          encoder.encode(`data: ${JSON.stringify({ error: error.message })}\n\n`)
        )
      }
      controller.close()
    }
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive'
    }
  })
}

常见问题

Q: 流式中断怎么办?

async function streamWithReconnect() {
  const maxRetries = 3
  let retries = 0
  
  while (retries < maxRetries) {
    try {
      const stream = await client.ai.chatStream({...})
      return stream
    } catch (error) {
      retries++
      await sleep(1000 * retries)  // 指数退避
    }
  }
}

Q: 如何在流式过程中取消请求?

const controller = new AbortController()

// 发送请求
const stream = await client.ai.chatStream({
  messages: [...],
  signal: controller.signal
})

// 取消请求
controller.abort()

Q: SSE 和 WebSocket 区别?

特性 SSE WebSocket
方向 单向(服务端→客户端) 双向
协议 HTTP ws://
重连 自动 需手动处理
复杂度 简单 复杂
适用 AI 流式输出 实时聊天

上一步: API Key 创建与管理
下一步: Webhook 事件接入