HTTP 流式响应中的连接断开问题:一次深入的排查之旅
in 码农笔记JavaScript笔记 with 0 comment

HTTP 流式响应中的连接断开问题:一次深入的排查之旅

in 码农笔记JavaScript笔记 with 0 comment

前言

在现代 Web 应用中,实时流式响应已经成为提升用户体验的重要手段,尤其是在 AI 对话、大文件传输、实时数据推送等场景中。虽然 WebSocket 和 Server-Sent Events (SSE) 是常见的解决方案,但基于传统 HTTP 的流式响应(使用 NDJSON 格式)同样具有独特优势:

为了更好地理解和演示这一技术,我创建了一个包含 HTTP Stream、SSE 和 WebSocket 三种方案的对比示例。然而,在实现过程中遇到了一个看似简单却颇为隐蔽的问题:流式响应在发送第一个数据包后就立即断开了连接

这篇文章将详细记录整个问题的排查和解决过程,希望能为遇到类似问题的开发者提供参考。

问题现象

用户侧表现

当用户点击"开始流式请求"按钮后,页面显示收到了 meta 消息,但随即就停止了,没有任何 chunk 数据返回。浏览器开发者工具的网络面板显示,在 stream 请求刚完成后,又自动发送了一个 /api/chat/abort 请求。

服务器日志

[HTTP m_1759993385380] 开始处理请求, 问题: "给我一个流式响应的讲解", delayMs=60
[HTTP m_1759993385380] 客户端断开连接
[HTTP m_1759993385380] 已发送 meta
[HTTP m_1759993385380] 准备流式发送,总长度: 158 字符
[streamText] 开始处理 53 个token
[streamText] 在第 0/53 个token时停止
[streamText] 处理完成. stopped=true
[HTTP m_1759993385380] 已发送 0 个 chunk
[HTTP m_1759993385380] 请求处理完成

可以看到,服务器在发送 meta 后立即检测到"客户端断开连接",导致 stopped=true,从而在第 0 个 token 时就停止了流式发送。

问题排查过程

第一次尝试:检查前端 abort 逻辑

假设:可能是前端在请求完成后自动调用了 abort。

我首先检查了前端的 stopStreaming 函数:

const stopStreaming = async (userTriggered = false) => {
  const previousId = currentId;

  if (userTriggered) {
    completion = 'user_abort';
  }

  if (controller) {
    controller.abort();  //  无条件调用
    controller = null;
  }

  const shouldNotifyAbort = previousId && (userTriggered || (completion !== 'done' && completion !== 'idle'));
  if (shouldNotifyAbort) {
    await fetch('/api/chat/abort', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ id: previousId })
    });
  }
  // ...
};

修改:将判断逻辑改为只在用户手动触发时才发送 abort 请求:

const shouldNotifyAbort = previousId && userTriggered && completion !== 'done' && completion !== 'idle';

结果: 失败。虽然不再自动发送 abort 请求,但流式数据依然没有返回。

第二次尝试:避免 controller.abort() 被过早调用

假设:即使不发送 abort 请求,controller.abort() 本身也会中断连接。

finally 块中调用 stopStreaming(false) 时,仍然会执行 controller.abort(),这可能导致正在进行的流式读取被中断。

修改:只在用户手动停止时才调用 controller.abort()

const stopStreaming = async (userTriggered = false) => {
  if (userTriggered) {
    completion = 'user_abort';
    
    if (controller) {
      console.log('[stopStreaming] 用户手动停止,调用 controller.abort()');
      controller.abort();
      controller = null;
    }
    
    if (previousId) {
      await fetch('/api/chat/abort', { /* ... */ });
    }
  } else {
    // 正常结束时,清理 controller 但不调用 abort
    console.log('[stopStreaming] 正常结束,清理 controller');
    controller = null;
  }
};

结果: 依然失败。问题似乎不在前端。

第三次尝试:深入服务器端排查

既然前端逻辑已经修正,但问题依然存在,说明问题出在服务器端。我添加了详细的日志来追踪:

客户端日志

[Reader] 开始读取流数据
[Reader] 第 1 次读取...
[Reader] 读取结果: {done: false, valueLength: 58}
[Reader] 解码后数据: 58 字符
[Reader] 分割出 1 行数据,剩余缓冲: 0 字符
[Reader] 解析消息: meta
[Reader] 第 2 次读取...
[Reader] 读取结果: {done: true, valueLength: 0}  //  立即返回 done
[Reader] 读取完成,当前 completion: pending

服务器日志

[HTTP m_1759993532848] 开始处理请求
[HTTP m_1759993532848] 客户端断开连接 (事件: req.close)  //  关键点
[HTTP m_1759993532848] 当前状态: res.writableEnded=false, res.destroyed=false, ctx.stopped=false
[HTTP m_1759993532848] 已发送 meta
[streamText] 在第 0/53 个token时停止

关键发现:服务器在发送 meta 后,立即触发了 req.close 事件!

问题根源

HTTP 流式响应的工作原理

在 HTTP 流式响应中,通信流程如下:

  1. 客户端发送请求

    • 发送 HTTP 请求头
    • 发送请求体(POST 数据)
    • 请求发送完毕
  2. 服务器接收请求

    • 读取请求头
    • 读取请求体
    • 请求读取完毕,触发 req.close 事件 这是正常的!
  3. 服务器发送流式响应

    • 保持响应连接打开
    • 持续写入数据块
    • 直到所有数据发送完毕或连接断开

错误的事件监听

在服务器代码中,我错误地监听了 req.close 事件:

const handleDisconnect = () => {
  console.log(`${logPrefix} 客户端断开连接`);
  ctx.stopped = true;
};

req.on('close', handleDisconnect);     //  错误:请求关闭不等于客户端断开
req.on('aborted', handleDisconnect);   //  正确:客户端主动中断
res.on('close', handleDisconnect);     //  正确:响应连接关闭

问题分析

形象比喻

可以把 HTTP 流式响应比作电话通话:

解决方案

修改服务器端事件监听

移除对 req.close 的监听,只保留真正表示连接断开的事件:

const handleDisconnect = (event) => {
  console.log(`${logPrefix} 客户端断开连接 (事件: ${event})`);
  console.log(`${logPrefix} 当前状态: res.writableEnded=${res.writableEnded}, res.destroyed=${res.destroyed}, ctx.stopped=${ctx.stopped}`);
  ctx.stopped = true;
};

// 注意:不要监听 req.on('close'),因为在流式响应中,请求体读取完毕后会立即触发 close
// 我们只需要监听响应的断开事件
req.on('aborted', () => handleDisconnect('req.aborted'));  // 客户端主动中断请求
res.on('close', () => handleDisconnect('res.close'));      // 响应连接关闭
res.on('error', (err) => {
  console.error(`${logPrefix} 响应错误:`, err.message);
  ctx.stopped = true;
});

事件说明

事件触发时机是否应该停止流式发送
req.close请求体读取完毕否 - 这是正常流程
req.aborted客户端主动中断请求(如调用 abort()是 - 客户端不想要数据了
res.close响应连接关闭(客户端断开或服务器完成)是 - 连接已断开
res.error响应写入错误是 - 无法继续发送

修改前端 abort 逻辑

同时,前端也需要确保只在必要时才调用 abort:

const stopStreaming = async (userTriggered = false) => {
  const previousId = currentId;

  console.log('[stopStreaming] 调用参数:', { userTriggered, completion, previousId });

  if (userTriggered) {
    // 只有用户手动触发时才中断连接
    completion = 'user_abort';
    
    if (controller) {
      console.log('[stopStreaming] 用户手动停止,调用 controller.abort()');
      controller.abort();
      controller = null;
    }

    // 通知服务器中断
    if (previousId) {
      try {
        console.log('[stopStreaming] 发送 abort 请求到服务器, id:', previousId);
        await fetch('/api/chat/abort', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ id: previousId })
        });
        console.log('[stopStreaming] abort 请求已发送');
      } catch (err) {
        console.warn('[stopStreaming] abort 请求失败:', err);
      }
    }
  } else {
    // 正常结束时,清理 controller 但不调用 abort
    console.log('[stopStreaming] 正常结束,清理 controller');
    controller = null;
  }

  currentId = null;
  startBtn.disabled = false;
  stopBtn.disabled = true;

  if (userTriggered) {
    setStatus('已手动停止', 'danger');
    pushTimeline(' 已取消', new Date().toLocaleTimeString());
  }
};

验证结果

修改后,服务器日志显示:

[HTTP m_1759993600000] 开始处理请求, 问题: "给我一个流式响应的讲解", delayMs=60
[HTTP m_1759993600000] 已发送 meta
[HTTP m_1759993600000] 准备流式发送,总长度: 158 字符
[streamText] 开始处理 53 个token
[streamText] 处理完成. stopped=false  //  正常完成
[HTTP m_1759993600000] 已发送 53 个 chunk  //  所有数据都发送了
[HTTP m_1759993600000] 已发送 done
[HTTP m_1759993600000] 请求处理完成
[HTTP m_1759993600000] 客户端断开连接 (事件: res.close)  //  在发送完毕后才关闭

客户端日志显示:

[Reader] 开始读取流数据
[Reader] 第 1 次读取: meta
[Reader] 第 2 次读取: chunk 1
[Reader] 第 3 次读取: chunk 2
...
[Reader] 第 54 次读取: chunk 53
[Reader] 第 55 次读取: done
[Reader] 第 56 次读取: {done: true}  //  所有数据读取完毕
[Reader] 读取完成,当前 completion: done

问题完全解决!

技术要点总结

1. HTTP 流式响应的正确姿势

服务器端

客户端

2. Node.js HTTP 事件的语义差异

场景Request 事件Response 事件
普通 HTTP 请求req.close - 请求结束res.close - 响应结束
HTTP 流式响应req.close - 请求体读取完毕(正常)res.close - 响应流结束
客户端主动断开req.aborted - 客户端中断res.close - 连接关闭

3. 调试技巧

在排查流式响应问题时,以下日志非常有帮助:

// 服务器端
const handleDisconnect = (event) => {
  console.log(`${logPrefix} 断开连接 (事件: ${event})`);
  console.log(`${logPrefix} 状态: res.writableEnded=${res.writableEnded}, res.destroyed=${res.destroyed}`);
};

// 客户端
while (true) {
  console.log('[Reader] 第', ++readCount, '次读取...');
  const { done, value } = await reader.read();
  console.log('[Reader] 读取结果:', { done, valueLength: value?.length || 0 });
  if (done) break;
  // 处理数据...
}

4. 常见误区

错误认知

正确理解

总结

这次问题的排查过程充分展示了事件驱动模型中正确理解事件语义的重要性。一个看似简单的 req.on('close') 监听,在不同的应用场景下有着完全不同的含义。

关键教训

  1. 理解协议层次:HTTP 请求和响应是两个独立的阶段,在流式场景下更需要区分对待
  2. 阅读官方文档:Node.js 文档明确说明了各个事件的触发时机,但容易被忽略
  3. 日志驱动调试:在分布式系统中,详细的日志是定位问题的最佳工具
  4. 渐进式排查:从前端到后端,从假设到验证,一步步缩小问题范围

适用场景

HTTP 流式响应特别适合以下场景:

相比 WebSocket:

最后的思考

技术实现的细节往往隐藏着魔鬼。这次看似简单的流式响应实现,却因为一个不起眼的事件监听错误,导致了完全无法工作。这提醒我们:

在引入新技术或新模式时,不仅要理解"怎么做",更要深入理解"为什么这么做"。只有这样,才能在遇到问题时快速定位根源,而不是在表象上打转。

希望这篇文章能帮助你更好地理解和使用 HTTP 流式响应技术。如果你也遇到了类似的问题,欢迎在评论区分享你的经验!


参考资源

示例代码

// server.js
const http = require('http');
const path = require('path');
const express = require('express');
const cors = require('cors');
const { WebSocketServer } = require('ws');

const app = express();
app.use(cors({ origin: true, credentials: true }));
app.use(express.json());
app.use('/public', express.static(path.join(__dirname, 'public')));

const server = http.createServer(app);

/** ----------------- 通用:模拟分片生成 & 任务管理 ----------------- **/
const jobs = new Map(); // jobId -> { stopped: boolean }

/** 模拟将文本拆分为一小段一小段输出 */
function delay(ms) { return new Promise(r => setTimeout(r, ms)); }
async function streamText(text, onChunk, ctx, perChunkMs = 50) {
  const tokens = text.split(/(\s+)/); // 保留空白以更像"打字机"
  console.log(`[streamText] 开始处理 ${tokens.length} 个token`);

  for (let i = 0; i < tokens.length; i++) {
    if (ctx.stopped) {
      console.log(`[streamText] 在第 ${i}/${tokens.length} 个token时停止`);
      break;
    }

    const t = tokens[i];
    const keepGoing = await Promise.resolve(onChunk(t));
    if (keepGoing === false) {
      console.log(`[streamText] onChunk 请求提前结束 (i=${i})`);
      break;
    }

    if (i < tokens.length - 1) {
      await delay(perChunkMs); // 只在这里await
    }
  }

  console.log(`[streamText] 处理完成. stopped=${ctx.stopped}`);
}

/** 一个示例的“回复文本” */
function sampleReply(prompt = '') {
  return `Hi! You said: "${prompt}". This is a streaming demo showing how chunks arrive gradually. ` +
         `We can also send meta and done messages. Feel free to abort midway.`;
}

/** ----------------- 方案一:HTTP + 流式读取(NDJSON) ----------------- **/
app.post('/api/chat/stream', async (req, res) => {
  const { q = 'hello', id = `m_${Date.now()}`, delayMs } = req.body || {};
  const ctx = { stopped: false };
  jobs.set(id, ctx);

  req.socket?.setNoDelay?.(true);
  req.socket?.setKeepAlive?.(true, 1000 * 30);

  const perChunkMs = Number.isFinite(Number(delayMs))
    ? Math.min(Math.max(Number(delayMs), 5), 2000)
    : 60;
  const logPrefix = `[HTTP ${id}]`;

  console.log(`${logPrefix} 开始处理请求, 问题: "${q}", delayMs=${perChunkMs}`);

  res.writeHead(200, {
    'Content-Type': 'application/x-ndjson; charset=utf-8',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Transfer-Encoding': 'chunked',
    'X-Accel-Buffering': 'no'
  });
  res.flushHeaders?.();

  const safeSend = async (payload) => {
    if (ctx.stopped || res.writableEnded || res.destroyed) {
      return false;
    }

    try {
      const line = JSON.stringify(payload) + '\n';
      const canContinue = res.write(line, 'utf8');
      res.flush?.();

      if (!canContinue) {
        await new Promise((resolve) => res.once('drain', resolve));
      }

      return !(ctx.stopped || res.writableEnded || res.destroyed);
    } catch (err) {
      console.error(`${logPrefix} 写入失败:`, err.message);
      ctx.stopped = true;
      return false;
    }
  };

  const cleanup = () => {
    if (!ctx.stopped) {
      console.log(`${logPrefix} cleanup -> 标记停止`);
      ctx.stopped = true;
    }
    if (!res.writableEnded && !res.destroyed) {
      res.end();
    }
    jobs.delete(id);
  };

  const handleDisconnect = (event) => {
    console.log(`${logPrefix} 客户端断开连接 (事件: ${event})`);
    console.log(`${logPrefix} 当前状态: res.writableEnded=${res.writableEnded}, res.destroyed=${res.destroyed}, ctx.stopped=${ctx.stopped}`);
    ctx.stopped = true;
  };

  // 注意:不要监听 req.on('close'),因为在流式响应中,请求体读取完毕后会立即触发 close
  // 我们只需要监听响应的断开事件
  req.on('aborted', () => handleDisconnect('req.aborted'));
  res.on('close', () => handleDisconnect('res.close'));
  res.on('error', (err) => {
    console.error(`${logPrefix} 响应错误:`, err.message);
    ctx.stopped = true;
  });

  try {
    await safeSend({ id, type: 'meta', ts: Date.now() });
    console.log(`${logPrefix} 已发送 meta`);

    const fullText = sampleReply(q);
    console.log(`${logPrefix} 准备流式发送,总长度: ${fullText.length} 字符`);

    let chunkCount = 0;
    await streamText(fullText, async (delta) => {
      if (!delta && delta !== '') {
        return true;
      }
      const ok = await safeSend({ id, type: 'chunk', delta });
      if (ok) chunkCount++;
      return ok;
    }, ctx, perChunkMs);

    console.log(`${logPrefix} 已发送 ${chunkCount} 个 chunk`);

    if (!ctx.stopped) {
      await safeSend({
        id,
        type: 'done',
        finish_reason: 'stop',
        ts: Date.now()
      });
      console.log(`${logPrefix} 已发送 done`);
    }
  } catch (err) {
    console.error(`${logPrefix} 流式过程中出错:`, err);
    if (!ctx.stopped) {
      await safeSend({ id, type: 'error', message: err.message });
    }
  } finally {
    cleanup();
    console.log(`${logPrefix} 请求处理完成\n`);
  }
});

/** 客户端主动中断(HTTP/SSE 通用) */
app.post('/api/chat/abort', (req, res) => {
  const { id } = req.body || {};
  const ctx = jobs.get(id);
  if (ctx) ctx.stopped = true;
  res.json({ ok: true });
});

/** ----------------- 方案二:SSE(text/event-stream) ----------------- **/
app.get('/api/chat/sse', async (req, res) => {
  const id = (req.query.id || `m_${Date.now()}`).toString();
  const prompt = (req.query.q || 'hello').toString();
  const perChunkMs = Number.isFinite(Number(req.query.delayMs))
    ? Math.min(Math.max(Number(req.query.delayMs), 5), 2000)
    : 60;
  const ctx = { stopped: false };
  jobs.set(id, ctx);
  const logPrefix = `[SSE ${id}]`;

  res.writeHead(200, {
    'Content-Type': 'text/event-stream; charset=utf-8',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no'
  });
  res.flushHeaders?.();

  const retryMs = Number.isFinite(Number(req.query.retryMs)) ? Number(req.query.retryMs) : 3000;
  if (retryMs > 0) {
    res.write(`retry: ${retryMs}\n\n`);
  }

  const safeEvent = (event, data) => {
    if (ctx.stopped || res.writableEnded || res.destroyed) {
      return false;
    }
    try {
      const chunk = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
      const canContinue = res.write(chunk, 'utf8');
      res.flush?.();
      if (!canContinue) {
        return new Promise((resolve) => res.once('drain', () => resolve(!(ctx.stopped || res.writableEnded || res.destroyed))));
      }
      return true;
    } catch (err) {
      console.error(`${logPrefix} 写入失败:`, err.message);
      ctx.stopped = true;
      return false;
    }
  };

  const keepAlive = setInterval(() => {
    if (ctx.stopped || res.writableEnded || res.destroyed) {
      clearInterval(keepAlive);
      return;
    }
    try {
      res.write(':\n\n');
    } catch (err) {
      console.error(`${logPrefix} 心跳写入失败:`, err.message);
      clearInterval(keepAlive);
      ctx.stopped = true;
    }
  }, 15000);

  const handleDisconnect = () => {
    console.log(`${logPrefix} 客户端断开`);
    ctx.stopped = true;
  };

  req.on('close', handleDisconnect);
  req.on('aborted', handleDisconnect);
  res.on('close', handleDisconnect);
  res.on('error', (err) => console.error(`${logPrefix} 响应错误:`, err.message));

  try {
    await safeEvent('meta', { id, ts: Date.now(), perChunkMs });
    await streamText(sampleReply(prompt), (delta) => safeEvent('token', { id, delta }), ctx, perChunkMs);
    await safeEvent('done', {
      id,
      finish_reason: ctx.stopped ? 'user_cancel' : 'stop',
      ts: Date.now()
    });
  } finally {
    clearInterval(keepAlive);
    if (!res.writableEnded && !res.destroyed) {
      res.end();
    }
    jobs.delete(id);
  }
});

/** ----------------- 方案三:WebSocket(双向) ----------------- **/
const wss = new WebSocketServer({ server });

wss.on('connection', (ws, req) => {
  // 简单解析查询参数(鉴权可换 JWT)
  const url = new URL(req.url, 'http://localhost');
  const conv = url.searchParams.get('conv') || 'c1';

  ws.on('message', (raw) => {
    let msg = {};
    try { msg = JSON.parse(raw.toString()); } catch { return; }

    if (msg.type === 'user_msg') {
      // 用客户端发来的 id 派生一个 assistant id,便于客户端 abort
      const aId = `a_${msg.id || Date.now()}`;
      const ctx = { stopped: false };
      jobs.set(aId, ctx);
      const perChunkMs = Number.isFinite(Number(msg.delayMs))
        ? Math.min(Math.max(Number(msg.delayMs), 5), 2000)
        : 60;

      // 可选:先回一条 start
      ws.send(JSON.stringify({ type: 'start', id: aId, convId: conv, ts: Date.now() }));

      streamText(sampleReply(msg.text || ''), (delta) => {
        if (ws.readyState !== ws.OPEN) {
          ctx.stopped = true;
          return false;
        }
        ws.send(JSON.stringify({
          type: 'assistant_chunk', id: aId, convId: conv, delta
        }));
        return true;
      }, ctx, perChunkMs)
        .then(() => {
          if (ws.readyState === ws.OPEN) {
            ws.send(JSON.stringify({
              type: 'done', id: aId, finish_reason: ctx.stopped ? 'user_cancel' : 'stop', ts: Date.now()
            }));
          }
        })
        .catch((err) => {
          console.error(`[WS ${aId}] 流式过程中出错:`, err);
        })
        .finally(() => {
          jobs.delete(aId);
        });

    } else if (msg.type === 'abort' && msg.target) {
      const ctx = jobs.get(msg.target);
      if (ctx) ctx.stopped = true;

    } else if (msg.type === 'ping') {
      ws.send(JSON.stringify({ type: 'pong', ts: Date.now() }));
    }
  });

  ws.on('close', () => {
    // 可选:标记此 socket 相关任务停止(示例简化略过)
  });
});

server.listen(3000, () => {
  console.log('Server listening on http://localhost:3000');
});

留言: