前言
在现代 Web 应用中,实时流式响应已经成为提升用户体验的重要手段,尤其是在 AI 对话、大文件传输、实时数据推送等场景中。虽然 WebSocket 和 Server-Sent Events (SSE) 是常见的解决方案,但基于传统 HTTP 的流式响应(使用 NDJSON 格式)同样具有独特优势:
- 无需协议升级:直接使用 HTTP 协议,无需特殊的握手过程
- 更好的中间件兼容性:与现有的负载均衡器、反向代理配合更顺畅
- 统一的错误处理:使用标准的 HTTP 状态码和错误处理机制
- 支持 AbortController:浏览器原生支持请求取消
为了更好地理解和演示这一技术,我创建了一个包含 HTTP Stream、SSE 和 WebSocket 三种方案的对比示例。然而,在实现过程中遇到了一个看似简单却颇为隐蔽的问题:流式响应在发送第一个数据包后就立即断开了连接。
这篇文章将详细记录整个问题的排查和解决过程,希望能为遇到类似问题的开发者提供参考。
问题现象
用户侧表现
当用户点击"开始流式请求"按钮后,页面显示收到了 meta 消息,但随即就停止了,没有任何 chunk 数据返回。浏览器开发者工具的网络面板显示,在 stream 请求刚完成后,又自动发送了一个 /api/chat/abort 请求。
服务器日志
[HTTP m_1759993385380] 开始处理请求, 问题: "给我一个流式响应的讲解", delayMs=60
[HTTP m_1759993385380] 客户端断开连接
[HTTP m_1759993385380] 已发送 meta
[HTTP m_1759993385380] 准备流式发送,总长度: 158 字符
[streamText] 开始处理 53 个token
[streamText] 在第 0/53 个token时停止
[streamText] 处理完成. stopped=true
[HTTP m_1759993385380] 已发送 0 个 chunk
[HTTP m_1759993385380] 请求处理完成可以看到,服务器在发送 meta 后立即检测到"客户端断开连接",导致 stopped=true,从而在第 0 个 token 时就停止了流式发送。
问题排查过程
第一次尝试:检查前端 abort 逻辑
假设:可能是前端在请求完成后自动调用了 abort。
我首先检查了前端的 stopStreaming 函数:
const stopStreaming = async (userTriggered = false) => {
const previousId = currentId;
if (userTriggered) {
completion = 'user_abort';
}
if (controller) {
controller.abort(); // 无条件调用
controller = null;
}
const shouldNotifyAbort = previousId && (userTriggered || (completion !== 'done' && completion !== 'idle'));
if (shouldNotifyAbort) {
await fetch('/api/chat/abort', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ id: previousId })
});
}
// ...
};修改:将判断逻辑改为只在用户手动触发时才发送 abort 请求:
const shouldNotifyAbort = previousId && userTriggered && completion !== 'done' && completion !== 'idle';结果: 失败。虽然不再自动发送 abort 请求,但流式数据依然没有返回。
第二次尝试:避免 controller.abort() 被过早调用
假设:即使不发送 abort 请求,controller.abort() 本身也会中断连接。
在 finally 块中调用 stopStreaming(false) 时,仍然会执行 controller.abort(),这可能导致正在进行的流式读取被中断。
修改:只在用户手动停止时才调用 controller.abort():
const stopStreaming = async (userTriggered = false) => {
if (userTriggered) {
completion = 'user_abort';
if (controller) {
console.log('[stopStreaming] 用户手动停止,调用 controller.abort()');
controller.abort();
controller = null;
}
if (previousId) {
await fetch('/api/chat/abort', { /* ... */ });
}
} else {
// 正常结束时,清理 controller 但不调用 abort
console.log('[stopStreaming] 正常结束,清理 controller');
controller = null;
}
};结果: 依然失败。问题似乎不在前端。
第三次尝试:深入服务器端排查
既然前端逻辑已经修正,但问题依然存在,说明问题出在服务器端。我添加了详细的日志来追踪:
客户端日志:
[Reader] 开始读取流数据
[Reader] 第 1 次读取...
[Reader] 读取结果: {done: false, valueLength: 58}
[Reader] 解码后数据: 58 字符
[Reader] 分割出 1 行数据,剩余缓冲: 0 字符
[Reader] 解析消息: meta
[Reader] 第 2 次读取...
[Reader] 读取结果: {done: true, valueLength: 0} // 立即返回 done
[Reader] 读取完成,当前 completion: pending服务器日志:
[HTTP m_1759993532848] 开始处理请求
[HTTP m_1759993532848] 客户端断开连接 (事件: req.close) // 关键点
[HTTP m_1759993532848] 当前状态: res.writableEnded=false, res.destroyed=false, ctx.stopped=false
[HTTP m_1759993532848] 已发送 meta
[streamText] 在第 0/53 个token时停止关键发现:服务器在发送 meta 后,立即触发了 req.close 事件!
问题根源
HTTP 流式响应的工作原理
在 HTTP 流式响应中,通信流程如下:
客户端发送请求:
- 发送 HTTP 请求头
- 发送请求体(POST 数据)
- 请求发送完毕
服务器接收请求:
- 读取请求头
- 读取请求体
- 请求读取完毕,触发
req.close事件 这是正常的!
服务器发送流式响应:
- 保持响应连接打开
- 持续写入数据块
- 直到所有数据发送完毕或连接断开
错误的事件监听
在服务器代码中,我错误地监听了 req.close 事件:
const handleDisconnect = () => {
console.log(`${logPrefix} 客户端断开连接`);
ctx.stopped = true;
};
req.on('close', handleDisconnect); // 错误:请求关闭不等于客户端断开
req.on('aborted', handleDisconnect); // 正确:客户端主动中断
res.on('close', handleDisconnect); // 正确:响应连接关闭问题分析:
req.close事件在请求体读取完毕后就会触发,这是 Node.js HTTP 模块的正常行为- 在流式响应场景下,请求关闭是正常的,但响应流应该继续保持打开
- 错误地将
req.close视为"客户端断开连接",导致ctx.stopped = true - 一旦
stopped为 true,流式发送逻辑就会提前终止
形象比喻
可以把 HTTP 流式响应比作电话通话:
- Request(请求):你打电话给客服,说完问题后就不再说话了(请求发送完毕,
req.close触发) - Response(响应):客服开始详细解答,持续说话(流式发送数据)
- 错误做法:客服一听到你不说话了,就以为你挂断电话,也立即挂断
- 正确做法:客服继续解答,直到说完或者你主动挂断(
req.aborted或res.close)
解决方案
修改服务器端事件监听
移除对 req.close 的监听,只保留真正表示连接断开的事件:
const handleDisconnect = (event) => {
console.log(`${logPrefix} 客户端断开连接 (事件: ${event})`);
console.log(`${logPrefix} 当前状态: res.writableEnded=${res.writableEnded}, res.destroyed=${res.destroyed}, ctx.stopped=${ctx.stopped}`);
ctx.stopped = true;
};
// 注意:不要监听 req.on('close'),因为在流式响应中,请求体读取完毕后会立即触发 close
// 我们只需要监听响应的断开事件
req.on('aborted', () => handleDisconnect('req.aborted')); // 客户端主动中断请求
res.on('close', () => handleDisconnect('res.close')); // 响应连接关闭
res.on('error', (err) => {
console.error(`${logPrefix} 响应错误:`, err.message);
ctx.stopped = true;
});事件说明
| 事件 | 触发时机 | 是否应该停止流式发送 |
|---|---|---|
req.close | 请求体读取完毕 | 否 - 这是正常流程 |
req.aborted | 客户端主动中断请求(如调用 abort()) | 是 - 客户端不想要数据了 |
res.close | 响应连接关闭(客户端断开或服务器完成) | 是 - 连接已断开 |
res.error | 响应写入错误 | 是 - 无法继续发送 |
修改前端 abort 逻辑
同时,前端也需要确保只在必要时才调用 abort:
const stopStreaming = async (userTriggered = false) => {
const previousId = currentId;
console.log('[stopStreaming] 调用参数:', { userTriggered, completion, previousId });
if (userTriggered) {
// 只有用户手动触发时才中断连接
completion = 'user_abort';
if (controller) {
console.log('[stopStreaming] 用户手动停止,调用 controller.abort()');
controller.abort();
controller = null;
}
// 通知服务器中断
if (previousId) {
try {
console.log('[stopStreaming] 发送 abort 请求到服务器, id:', previousId);
await fetch('/api/chat/abort', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ id: previousId })
});
console.log('[stopStreaming] abort 请求已发送');
} catch (err) {
console.warn('[stopStreaming] abort 请求失败:', err);
}
}
} else {
// 正常结束时,清理 controller 但不调用 abort
console.log('[stopStreaming] 正常结束,清理 controller');
controller = null;
}
currentId = null;
startBtn.disabled = false;
stopBtn.disabled = true;
if (userTriggered) {
setStatus('已手动停止', 'danger');
pushTimeline(' 已取消', new Date().toLocaleTimeString());
}
};验证结果
修改后,服务器日志显示:
[HTTP m_1759993600000] 开始处理请求, 问题: "给我一个流式响应的讲解", delayMs=60
[HTTP m_1759993600000] 已发送 meta
[HTTP m_1759993600000] 准备流式发送,总长度: 158 字符
[streamText] 开始处理 53 个token
[streamText] 处理完成. stopped=false // 正常完成
[HTTP m_1759993600000] 已发送 53 个 chunk // 所有数据都发送了
[HTTP m_1759993600000] 已发送 done
[HTTP m_1759993600000] 请求处理完成
[HTTP m_1759993600000] 客户端断开连接 (事件: res.close) // 在发送完毕后才关闭客户端日志显示:
[Reader] 开始读取流数据
[Reader] 第 1 次读取: meta
[Reader] 第 2 次读取: chunk 1
[Reader] 第 3 次读取: chunk 2
...
[Reader] 第 54 次读取: chunk 53
[Reader] 第 55 次读取: done
[Reader] 第 56 次读取: {done: true} // 所有数据读取完毕
[Reader] 读取完成,当前 completion: done问题完全解决!
技术要点总结
1. HTTP 流式响应的正确姿势
服务器端:
- 设置正确的响应头:
Content-Type: application/x-ndjson,Transfer-Encoding: chunked - 使用
res.write()持续写入数据,每次写入后调用res.flush() - 监听响应相关的断开事件(
res.close,req.aborted),而非请求事件(req.close) - 在流式发送完成后调用
res.end()
客户端:
- 使用
fetch+ReadableStream或XMLHttpRequest+onprogress - 通过
AbortController实现请求取消 - 正确区分"正常完成"和"用户取消"两种场景
2. Node.js HTTP 事件的语义差异
| 场景 | Request 事件 | Response 事件 |
|---|---|---|
| 普通 HTTP 请求 | req.close - 请求结束 | res.close - 响应结束 |
| HTTP 流式响应 | req.close - 请求体读取完毕(正常) | res.close - 响应流结束 |
| 客户端主动断开 | req.aborted - 客户端中断 | res.close - 连接关闭 |
3. 调试技巧
在排查流式响应问题时,以下日志非常有帮助:
// 服务器端
const handleDisconnect = (event) => {
console.log(`${logPrefix} 断开连接 (事件: ${event})`);
console.log(`${logPrefix} 状态: res.writableEnded=${res.writableEnded}, res.destroyed=${res.destroyed}`);
};
// 客户端
while (true) {
console.log('[Reader] 第', ++readCount, '次读取...');
const { done, value } = await reader.read();
console.log('[Reader] 读取结果:', { done, valueLength: value?.length || 0 });
if (done) break;
// 处理数据...
}4. 常见误区
错误认知:
- "请求关闭(
req.close)表示客户端断开连接" - "应该在
finally块中无条件调用abort()" - "流式响应需要特殊的 WebSocket 或 SSE 协议"
正确理解:
- 请求关闭是正常的,响应可以继续
- 只有在用户取消或异常时才需要 abort
- 普通 HTTP 完全可以实现流式响应
总结
这次问题的排查过程充分展示了事件驱动模型中正确理解事件语义的重要性。一个看似简单的 req.on('close') 监听,在不同的应用场景下有着完全不同的含义。
关键教训
- 理解协议层次:HTTP 请求和响应是两个独立的阶段,在流式场景下更需要区分对待
- 阅读官方文档:Node.js 文档明确说明了各个事件的触发时机,但容易被忽略
- 日志驱动调试:在分布式系统中,详细的日志是定位问题的最佳工具
- 渐进式排查:从前端到后端,从假设到验证,一步步缩小问题范围
适用场景
HTTP 流式响应特别适合以下场景:
- AI 对话(ChatGPT 式的打字机效果)
- 大文件上传/下载进度反馈
- 日志实时推送
- 数据导出(边生成边传输)
- 已有 HTTP 基础设施的团队
相比 WebSocket:
- 优势:无需协议升级、更好的中间件兼容性、统一的错误处理
- 劣势:单向通信(客户端到服务器需要另外的请求)、无法服务器主动推送
最后的思考
技术实现的细节往往隐藏着魔鬼。这次看似简单的流式响应实现,却因为一个不起眼的事件监听错误,导致了完全无法工作。这提醒我们:
在引入新技术或新模式时,不仅要理解"怎么做",更要深入理解"为什么这么做"。只有这样,才能在遇到问题时快速定位根源,而不是在表象上打转。
希望这篇文章能帮助你更好地理解和使用 HTTP 流式响应技术。如果你也遇到了类似的问题,欢迎在评论区分享你的经验!
参考资源
示例代码
// server.js
const http = require('http');
const path = require('path');
const express = require('express');
const cors = require('cors');
const { WebSocketServer } = require('ws');
const app = express();
app.use(cors({ origin: true, credentials: true }));
app.use(express.json());
app.use('/public', express.static(path.join(__dirname, 'public')));
const server = http.createServer(app);
/** ----------------- 通用:模拟分片生成 & 任务管理 ----------------- **/
const jobs = new Map(); // jobId -> { stopped: boolean }
/** 模拟将文本拆分为一小段一小段输出 */
function delay(ms) { return new Promise(r => setTimeout(r, ms)); }
async function streamText(text, onChunk, ctx, perChunkMs = 50) {
const tokens = text.split(/(\s+)/); // 保留空白以更像"打字机"
console.log(`[streamText] 开始处理 ${tokens.length} 个token`);
for (let i = 0; i < tokens.length; i++) {
if (ctx.stopped) {
console.log(`[streamText] 在第 ${i}/${tokens.length} 个token时停止`);
break;
}
const t = tokens[i];
const keepGoing = await Promise.resolve(onChunk(t));
if (keepGoing === false) {
console.log(`[streamText] onChunk 请求提前结束 (i=${i})`);
break;
}
if (i < tokens.length - 1) {
await delay(perChunkMs); // 只在这里await
}
}
console.log(`[streamText] 处理完成. stopped=${ctx.stopped}`);
}
/** 一个示例的“回复文本” */
function sampleReply(prompt = '') {
return `Hi! You said: "${prompt}". This is a streaming demo showing how chunks arrive gradually. ` +
`We can also send meta and done messages. Feel free to abort midway.`;
}
/** ----------------- 方案一:HTTP + 流式读取(NDJSON) ----------------- **/
app.post('/api/chat/stream', async (req, res) => {
const { q = 'hello', id = `m_${Date.now()}`, delayMs } = req.body || {};
const ctx = { stopped: false };
jobs.set(id, ctx);
req.socket?.setNoDelay?.(true);
req.socket?.setKeepAlive?.(true, 1000 * 30);
const perChunkMs = Number.isFinite(Number(delayMs))
? Math.min(Math.max(Number(delayMs), 5), 2000)
: 60;
const logPrefix = `[HTTP ${id}]`;
console.log(`${logPrefix} 开始处理请求, 问题: "${q}", delayMs=${perChunkMs}`);
res.writeHead(200, {
'Content-Type': 'application/x-ndjson; charset=utf-8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Transfer-Encoding': 'chunked',
'X-Accel-Buffering': 'no'
});
res.flushHeaders?.();
const safeSend = async (payload) => {
if (ctx.stopped || res.writableEnded || res.destroyed) {
return false;
}
try {
const line = JSON.stringify(payload) + '\n';
const canContinue = res.write(line, 'utf8');
res.flush?.();
if (!canContinue) {
await new Promise((resolve) => res.once('drain', resolve));
}
return !(ctx.stopped || res.writableEnded || res.destroyed);
} catch (err) {
console.error(`${logPrefix} 写入失败:`, err.message);
ctx.stopped = true;
return false;
}
};
const cleanup = () => {
if (!ctx.stopped) {
console.log(`${logPrefix} cleanup -> 标记停止`);
ctx.stopped = true;
}
if (!res.writableEnded && !res.destroyed) {
res.end();
}
jobs.delete(id);
};
const handleDisconnect = (event) => {
console.log(`${logPrefix} 客户端断开连接 (事件: ${event})`);
console.log(`${logPrefix} 当前状态: res.writableEnded=${res.writableEnded}, res.destroyed=${res.destroyed}, ctx.stopped=${ctx.stopped}`);
ctx.stopped = true;
};
// 注意:不要监听 req.on('close'),因为在流式响应中,请求体读取完毕后会立即触发 close
// 我们只需要监听响应的断开事件
req.on('aborted', () => handleDisconnect('req.aborted'));
res.on('close', () => handleDisconnect('res.close'));
res.on('error', (err) => {
console.error(`${logPrefix} 响应错误:`, err.message);
ctx.stopped = true;
});
try {
await safeSend({ id, type: 'meta', ts: Date.now() });
console.log(`${logPrefix} 已发送 meta`);
const fullText = sampleReply(q);
console.log(`${logPrefix} 准备流式发送,总长度: ${fullText.length} 字符`);
let chunkCount = 0;
await streamText(fullText, async (delta) => {
if (!delta && delta !== '') {
return true;
}
const ok = await safeSend({ id, type: 'chunk', delta });
if (ok) chunkCount++;
return ok;
}, ctx, perChunkMs);
console.log(`${logPrefix} 已发送 ${chunkCount} 个 chunk`);
if (!ctx.stopped) {
await safeSend({
id,
type: 'done',
finish_reason: 'stop',
ts: Date.now()
});
console.log(`${logPrefix} 已发送 done`);
}
} catch (err) {
console.error(`${logPrefix} 流式过程中出错:`, err);
if (!ctx.stopped) {
await safeSend({ id, type: 'error', message: err.message });
}
} finally {
cleanup();
console.log(`${logPrefix} 请求处理完成\n`);
}
});
/** 客户端主动中断(HTTP/SSE 通用) */
app.post('/api/chat/abort', (req, res) => {
const { id } = req.body || {};
const ctx = jobs.get(id);
if (ctx) ctx.stopped = true;
res.json({ ok: true });
});
/** ----------------- 方案二:SSE(text/event-stream) ----------------- **/
app.get('/api/chat/sse', async (req, res) => {
const id = (req.query.id || `m_${Date.now()}`).toString();
const prompt = (req.query.q || 'hello').toString();
const perChunkMs = Number.isFinite(Number(req.query.delayMs))
? Math.min(Math.max(Number(req.query.delayMs), 5), 2000)
: 60;
const ctx = { stopped: false };
jobs.set(id, ctx);
const logPrefix = `[SSE ${id}]`;
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
res.flushHeaders?.();
const retryMs = Number.isFinite(Number(req.query.retryMs)) ? Number(req.query.retryMs) : 3000;
if (retryMs > 0) {
res.write(`retry: ${retryMs}\n\n`);
}
const safeEvent = (event, data) => {
if (ctx.stopped || res.writableEnded || res.destroyed) {
return false;
}
try {
const chunk = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
const canContinue = res.write(chunk, 'utf8');
res.flush?.();
if (!canContinue) {
return new Promise((resolve) => res.once('drain', () => resolve(!(ctx.stopped || res.writableEnded || res.destroyed))));
}
return true;
} catch (err) {
console.error(`${logPrefix} 写入失败:`, err.message);
ctx.stopped = true;
return false;
}
};
const keepAlive = setInterval(() => {
if (ctx.stopped || res.writableEnded || res.destroyed) {
clearInterval(keepAlive);
return;
}
try {
res.write(':\n\n');
} catch (err) {
console.error(`${logPrefix} 心跳写入失败:`, err.message);
clearInterval(keepAlive);
ctx.stopped = true;
}
}, 15000);
const handleDisconnect = () => {
console.log(`${logPrefix} 客户端断开`);
ctx.stopped = true;
};
req.on('close', handleDisconnect);
req.on('aborted', handleDisconnect);
res.on('close', handleDisconnect);
res.on('error', (err) => console.error(`${logPrefix} 响应错误:`, err.message));
try {
await safeEvent('meta', { id, ts: Date.now(), perChunkMs });
await streamText(sampleReply(prompt), (delta) => safeEvent('token', { id, delta }), ctx, perChunkMs);
await safeEvent('done', {
id,
finish_reason: ctx.stopped ? 'user_cancel' : 'stop',
ts: Date.now()
});
} finally {
clearInterval(keepAlive);
if (!res.writableEnded && !res.destroyed) {
res.end();
}
jobs.delete(id);
}
});
/** ----------------- 方案三:WebSocket(双向) ----------------- **/
const wss = new WebSocketServer({ server });
wss.on('connection', (ws, req) => {
// 简单解析查询参数(鉴权可换 JWT)
const url = new URL(req.url, 'http://localhost');
const conv = url.searchParams.get('conv') || 'c1';
ws.on('message', (raw) => {
let msg = {};
try { msg = JSON.parse(raw.toString()); } catch { return; }
if (msg.type === 'user_msg') {
// 用客户端发来的 id 派生一个 assistant id,便于客户端 abort
const aId = `a_${msg.id || Date.now()}`;
const ctx = { stopped: false };
jobs.set(aId, ctx);
const perChunkMs = Number.isFinite(Number(msg.delayMs))
? Math.min(Math.max(Number(msg.delayMs), 5), 2000)
: 60;
// 可选:先回一条 start
ws.send(JSON.stringify({ type: 'start', id: aId, convId: conv, ts: Date.now() }));
streamText(sampleReply(msg.text || ''), (delta) => {
if (ws.readyState !== ws.OPEN) {
ctx.stopped = true;
return false;
}
ws.send(JSON.stringify({
type: 'assistant_chunk', id: aId, convId: conv, delta
}));
return true;
}, ctx, perChunkMs)
.then(() => {
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify({
type: 'done', id: aId, finish_reason: ctx.stopped ? 'user_cancel' : 'stop', ts: Date.now()
}));
}
})
.catch((err) => {
console.error(`[WS ${aId}] 流式过程中出错:`, err);
})
.finally(() => {
jobs.delete(aId);
});
} else if (msg.type === 'abort' && msg.target) {
const ctx = jobs.get(msg.target);
if (ctx) ctx.stopped = true;
} else if (msg.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong', ts: Date.now() }));
}
});
ws.on('close', () => {
// 可选:标记此 socket 相关任务停止(示例简化略过)
});
});
server.listen(3000, () => {
console.log('Server listening on http://localhost:3000');
});
本文由 小但 创作
全文共:19963个字
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载,均为作者原创,转载前请务必署名
最后编辑时间为: Oct 9, 2025 at 03:20 pm