Streaming 与回包组装#
3.1 讲到请求被 scheduler 接管为止。这一节补的是后半段:scheduler 已经产出了 token 级结果以后,结果怎样回到 API server,怎样被逐步解码成文本,又怎样被组装成 streaming chunk 或完整响应。
这一节只关注四个问题:
- scheduler 输出的到底是什么。
DetokenizerManager怎样把 token ids 变成字符串增量。TokenizerManager怎样把返回结果折叠回ReqState。- serving 层怎样基于同一份后端状态分出 streaming 和 non-streaming 两种响应路径。
一张图先看返回主线#
先把这一节要讲的链路压成一张图:
flowchart TB
A["Scheduler<br/>BatchTokenIDOutput"] --> B["DetokenizerManager<br/>增量 detokenize"]
B --> C["BatchStrOutput"]
C --> D["TokenizerManager._handle_batch_output<br/>回填 ReqState"]
D --> E["TokenizerManager._wait_one_response"]
E --> F["streaming SSE chunks"]
E --> G["full ChatCompletionResponse"]这张图里最重要的一点是:scheduler 并不直接回文本。它先回 token ids 和一组伴随的 meta 信息,然后由 detokenizer 和 API server 侧状态层一起完成后面的收口。
如果把这条返回链再压成“对象怎样变化”,可以得到更清楚的第二张图:
flowchart LR
A["BatchTokenIDOutput<br/>token ids + finish/meta"] --> B["BatchStrOutput<br/>text delta + output_ids + meta"]
B --> C["ReqState<br/>text / output_ids / time_stats"]
C --> D["streaming chunk"]
C --> E["full response"]第一张图强调进程边界,第二张图强调返回对象的变化顺序。两张图合起来,基本就把这一节的主线钉住了。
为什么 scheduler 不直接回文本#
3.1 里我们停在 Req 被 scheduler 接管的地方。这一节从 scheduler 的输出开始。scheduler 向外发送的不是字符串,而是 token ids 批次、结束原因、时间统计以及一组附带的观测字段。这些内容在 detokenize 之前还不能直接变成用户可见文本。
返回对象的定义本身已经说明了这一点。最终回到 tokenizer manager 一侧的对象是 BatchStrOutput
:
class BatchStrOutput(BaseBatchReq, SpeculativeDecodingMetricsMixin):
finished_reasons: List[dict]
output_strs: List[str]
output_ids: Optional[List[int]]
prompt_tokens: List[int]
completion_tokens: List[int]
reasoning_tokens: List[int]这段定义至少说明两件事:
- 文本输出和 token 输出是并存的,而不是只保留其中一种。
- 回包阶段不仅在传文本,还在传 finish reason、token 统计和观测字段。
所以“结果返回”不是一个动作,而是至少两个阶段:
- scheduler 产出 token 级结果;
- detokenizer 把 token 级结果变成文本级结果。
DetokenizerManager 真正在做什么#
从进程边界看,DetokenizerManager
的职责非常集中:
- 从 scheduler 收
BatchTokenIDOutput - 做增量 detokenize
- 向 tokenizer manager 回发
BatchStrOutput
它的事件循环非常短:
recv_obj = self.recv_from_scheduler.recv_pyobj()
output = self._request_dispatcher(recv_obj)
if output is not None:
self.send_to_tokenizer.send_pyobj(output)真正的复杂度不在主循环,而在 DetokenizerManager._decode_batch_token_id_output
。这个函数至少做了三件事:
- 维护每个
rid的DecodeStatus,记住已经解到哪里、已经发到哪里。 - 对 token ids 做 batch decode 或逐条 decode。
- 在 streaming 场景下只产出文本增量,而不是每次都回完整文本。
它的增量逻辑值得直接看一眼:
new_text = read_texts[i][len(surr_texts[i]) :]
if recv_obj.finished_reasons[i] is None:
if len(new_text) > 0 and not new_text.endswith("�"):
s.decoded_text = s.decoded_text + new_text
s.surr_offset = s.read_offset
s.read_offset = len(s.decode_ids)
new_text = ""
else:
new_text = find_printable_text(new_text)这段代码说明,detokenizer 不只是“把 token decode 一下”。它还要处理增量解码边界、不完整字符,以及 streaming 场景下文本应该在哪个位置切开。
最后,DetokenizerManager.handle_batch_token_id_out
会把这些字符串和原来的 output_ids、finish_reason、token 统计重新打包成 BatchStrOutput。
TokenizerManager 怎样把返回结果接回状态#
回到 API server 一侧以后,TokenizerManager.handle_loop
持续从 recv_from_detokenizer 收结果,然后交给 TokenizerManager._handle_batch_output
。
这一步的本质不是简单转发,而是把进程间返回对象重新折叠进每个请求对应的 ReqState。它做的事可以分成四层:
- 根据
rid找到ReqState - 构造
meta_info - 更新
state.text、state.output_ids、logprob、hidden states 等 - 把新的结果片段放进
state.out_list,并state.event.set()
最关键的几行是:
state.text += recv_obj.output_strs[i]
state.output_ids.extend(recv_obj.output_ids[i])
out_dict = {
"text": output_text,
"output_ids": output_token_ids,
"meta_info": meta_info,
}
state.out_list.append(out_dict)
state.event.set()到这里就能很清楚地看到 ReqState 的真正职责:它不是只在请求进入时建一下就结束,而是 API server 侧的返回状态容器。增量文本、增量 token ids、完成标志和时间统计都先回到这里,之后 serving 层才能继续分出 streaming 和 non-streaming 两条路径。
streaming 和 non-streaming 到底在哪里分叉#
真正的分叉点其实有两层。
第一层在 API server 一侧等待结果的方式上,核心是 TokenizerManager._wait_one_response
。这个函数会:
- 等待
ReqState.event - drain
state.out_list - 在 incremental streaming 场景下合并 backlog
- 在
finished时补齐最终时间戳和日志
也就是说,同一份 ReqState 会被同一个等待器消费,只是 streaming 和 non-streaming 对它的消费方式不同。
第二层分叉在 serving 层:
- streaming 走
OpenAIServingChat._handle_streaming_request和OpenAIServingChat._generate_chat_stream - non-streaming 走
OpenAIServingChat._handle_non_streaming_request
non-streaming 路径很短:
ret = await self.tokenizer_manager.generate_request(
adapted_request, raw_request
).__anext__()
response = self._build_chat_response(request, ret, int(time.time()))它只取生成器的第一个完整结果,然后构造 ChatCompletionResponse。
streaming 路径则要继续把同一份后端结果转换成 SSE 语义:
- 先发第一条带
role="assistant"的 chunk - 再发增量文本 chunk
- 必要时插入 tool call、reasoning、usage 和 finish reason
- 最后发
[DONE]
所以 streaming 和 non-streaming 不是两套后端,而是一套返回状态 + 两种不同的消费与组装方式。
为什么 streaming 明显更复杂#
如果只看 ReqState,streaming 和 non-streaming 的差别很像“要不要多返回几次”。但从代码看,streaming 真正复杂的地方至少有三层:
- detokenizer 要处理增量字符串边界;
- tokenizer manager 要处理 backlog drain 和增量
out_list; - serving 层还要把同一份后端结果重新编码成 SSE 语义。
这也是为什么 3.1 只讲到 scheduler 接管请求,而把这一节单独拆出来。如果把两节压在一起,读者很容易把“请求怎样进入 runtime”和“结果怎样重新长成 API 响应”混成一条过长的链。
调试这条返回链时先看哪里#
这一节对应的调试顺序也和 3.1 不一样。更稳的顺序通常是:
- 先确认 scheduler 是否已经正常产出 token ids 和 finish reason。
- 再确认 detokenizer 是否成功把
BatchTokenIDOutput变成BatchStrOutput。 - 然后确认
TokenizerManager._handle_batch_output是否成功更新了ReqState。 - 最后再看 serving 层是 streaming 组装出了问题,还是 non-streaming 构造完整响应时出了问题。
这样排的好处是,你可以先按“对象有没有回来”分层,再按“回来以后怎样被消费”细分,而不是一上来就盯 SSE 或最终 JSON。
小结#
3.1 解决的是“请求怎样进入 runtime”,这一节解决的是“结果怎样回到协议表面”。这条返回主线也可以压成一句话:
BatchTokenIDOutput -> BatchStrOutput -> ReqState -> streaming chunk / full response
理解这条链之后,后面无论你是在排 streaming 卡顿、finish reason 异常、tool call 组装错误,还是日志时间戳对不上,都会更容易先找到正确的边界。
叶王 © 2013-2026 版权所有。如果本文档对你有所帮助,可以请作者喝饮料。