Streaming 与回包组装#

3.1 讲到请求被 scheduler 接管为止。这一节补的是后半段:scheduler 已经产出了 token 级结果以后,结果怎样回到 API server,怎样被逐步解码成文本,又怎样被组装成 streaming chunk 或完整响应。

这一节只关注四个问题:

  1. scheduler 输出的到底是什么。
  2. DetokenizerManager 怎样把 token ids 变成字符串增量。
  3. TokenizerManager 怎样把返回结果折叠回 ReqState
  4. serving 层怎样基于同一份后端状态分出 streaming 和 non-streaming 两种响应路径。

一张图先看返回主线#

先把这一节要讲的链路压成一张图:

flowchart TB
    A["Scheduler<br/>BatchTokenIDOutput"] --> B["DetokenizerManager<br/>增量 detokenize"]
    B --> C["BatchStrOutput"]
    C --> D["TokenizerManager._handle_batch_output<br/>回填 ReqState"]
    D --> E["TokenizerManager._wait_one_response"]
    E --> F["streaming SSE chunks"]
    E --> G["full ChatCompletionResponse"]

这张图里最重要的一点是:scheduler 并不直接回文本。它先回 token ids 和一组伴随的 meta 信息,然后由 detokenizer 和 API server 侧状态层一起完成后面的收口。

如果把这条返回链再压成“对象怎样变化”,可以得到更清楚的第二张图:

flowchart LR
    A["BatchTokenIDOutput<br/>token ids + finish/meta"] --> B["BatchStrOutput<br/>text delta + output_ids + meta"]
    B --> C["ReqState<br/>text / output_ids / time_stats"]
    C --> D["streaming chunk"]
    C --> E["full response"]

第一张图强调进程边界,第二张图强调返回对象的变化顺序。两张图合起来,基本就把这一节的主线钉住了。

为什么 scheduler 不直接回文本#

3.1 里我们停在 Req 被 scheduler 接管的地方。这一节从 scheduler 的输出开始。scheduler 向外发送的不是字符串,而是 token ids 批次、结束原因、时间统计以及一组附带的观测字段。这些内容在 detokenize 之前还不能直接变成用户可见文本。

返回对象的定义本身已经说明了这一点。最终回到 tokenizer manager 一侧的对象是 BatchStrOutput

class BatchStrOutput(BaseBatchReq, SpeculativeDecodingMetricsMixin):
    finished_reasons: List[dict]
    output_strs: List[str]
    output_ids: Optional[List[int]]
    prompt_tokens: List[int]
    completion_tokens: List[int]
    reasoning_tokens: List[int]

这段定义至少说明两件事:

  1. 文本输出和 token 输出是并存的,而不是只保留其中一种。
  2. 回包阶段不仅在传文本,还在传 finish reason、token 统计和观测字段。

所以“结果返回”不是一个动作,而是至少两个阶段:

  1. scheduler 产出 token 级结果;
  2. detokenizer 把 token 级结果变成文本级结果。

DetokenizerManager 真正在做什么#

从进程边界看,DetokenizerManager 的职责非常集中:

  • 从 scheduler 收 BatchTokenIDOutput
  • 做增量 detokenize
  • 向 tokenizer manager 回发 BatchStrOutput

它的事件循环非常短:

recv_obj = self.recv_from_scheduler.recv_pyobj()
output = self._request_dispatcher(recv_obj)
if output is not None:
    self.send_to_tokenizer.send_pyobj(output)

真正的复杂度不在主循环,而在 DetokenizerManager._decode_batch_token_id_output 。这个函数至少做了三件事:

  1. 维护每个 ridDecodeStatus,记住已经解到哪里、已经发到哪里。
  2. 对 token ids 做 batch decode 或逐条 decode。
  3. 在 streaming 场景下只产出文本增量,而不是每次都回完整文本。

它的增量逻辑值得直接看一眼:

new_text = read_texts[i][len(surr_texts[i]) :]
if recv_obj.finished_reasons[i] is None:
    if len(new_text) > 0 and not new_text.endswith("�"):
        s.decoded_text = s.decoded_text + new_text
        s.surr_offset = s.read_offset
        s.read_offset = len(s.decode_ids)
        new_text = ""
    else:
        new_text = find_printable_text(new_text)

这段代码说明,detokenizer 不只是“把 token decode 一下”。它还要处理增量解码边界、不完整字符,以及 streaming 场景下文本应该在哪个位置切开。

最后,DetokenizerManager.handle_batch_token_id_out 会把这些字符串和原来的 output_idsfinish_reason、token 统计重新打包成 BatchStrOutput

TokenizerManager 怎样把返回结果接回状态#

回到 API server 一侧以后,TokenizerManager.handle_loop 持续从 recv_from_detokenizer 收结果,然后交给 TokenizerManager._handle_batch_output

这一步的本质不是简单转发,而是把进程间返回对象重新折叠进每个请求对应的 ReqState。它做的事可以分成四层:

  1. 根据 rid 找到 ReqState
  2. 构造 meta_info
  3. 更新 state.textstate.output_ids、logprob、hidden states 等
  4. 把新的结果片段放进 state.out_list,并 state.event.set()

最关键的几行是:

state.text += recv_obj.output_strs[i]
state.output_ids.extend(recv_obj.output_ids[i])
out_dict = {
    "text": output_text,
    "output_ids": output_token_ids,
    "meta_info": meta_info,
}
state.out_list.append(out_dict)
state.event.set()

到这里就能很清楚地看到 ReqState 的真正职责:它不是只在请求进入时建一下就结束,而是 API server 侧的返回状态容器。增量文本、增量 token ids、完成标志和时间统计都先回到这里,之后 serving 层才能继续分出 streaming 和 non-streaming 两条路径。

streaming 和 non-streaming 到底在哪里分叉#

真正的分叉点其实有两层。

第一层在 API server 一侧等待结果的方式上,核心是 TokenizerManager._wait_one_response 。这个函数会:

  • 等待 ReqState.event
  • drain state.out_list
  • 在 incremental streaming 场景下合并 backlog
  • finished 时补齐最终时间戳和日志

也就是说,同一份 ReqState 会被同一个等待器消费,只是 streaming 和 non-streaming 对它的消费方式不同。

第二层分叉在 serving 层:

non-streaming 路径很短:

ret = await self.tokenizer_manager.generate_request(
    adapted_request, raw_request
).__anext__()
response = self._build_chat_response(request, ret, int(time.time()))

它只取生成器的第一个完整结果,然后构造 ChatCompletionResponse

streaming 路径则要继续把同一份后端结果转换成 SSE 语义:

  • 先发第一条带 role="assistant" 的 chunk
  • 再发增量文本 chunk
  • 必要时插入 tool call、reasoning、usage 和 finish reason
  • 最后发 [DONE]

所以 streaming 和 non-streaming 不是两套后端,而是一套返回状态 + 两种不同的消费与组装方式

为什么 streaming 明显更复杂#

如果只看 ReqState,streaming 和 non-streaming 的差别很像“要不要多返回几次”。但从代码看,streaming 真正复杂的地方至少有三层:

  1. detokenizer 要处理增量字符串边界;
  2. tokenizer manager 要处理 backlog drain 和增量 out_list
  3. serving 层还要把同一份后端结果重新编码成 SSE 语义。

这也是为什么 3.1 只讲到 scheduler 接管请求,而把这一节单独拆出来。如果把两节压在一起,读者很容易把“请求怎样进入 runtime”和“结果怎样重新长成 API 响应”混成一条过长的链。

调试这条返回链时先看哪里#

这一节对应的调试顺序也和 3.1 不一样。更稳的顺序通常是:

  1. 先确认 scheduler 是否已经正常产出 token ids 和 finish reason。
  2. 再确认 detokenizer 是否成功把 BatchTokenIDOutput 变成 BatchStrOutput
  3. 然后确认 TokenizerManager._handle_batch_output 是否成功更新了 ReqState
  4. 最后再看 serving 层是 streaming 组装出了问题,还是 non-streaming 构造完整响应时出了问题。

这样排的好处是,你可以先按“对象有没有回来”分层,再按“回来以后怎样被消费”细分,而不是一上来就盯 SSE 或最终 JSON。

小结#

3.1 解决的是“请求怎样进入 runtime”,这一节解决的是“结果怎样回到协议表面”。这条返回主线也可以压成一句话:

BatchTokenIDOutput -> BatchStrOutput -> ReqState -> streaming chunk / full response

理解这条链之后,后面无论你是在排 streaming 卡顿、finish reason 异常、tool call 组装错误,还是日志时间戳对不上,都会更容易先找到正确的边界。