Harmony-specific streaming delta extraction for chat completions.
This module handles the extraction of DeltaMessage objects from harmony parser state during streaming chat completions.
extract_harmony_streaming_delta(
harmony_parser: StreamableParser,
cur_channel: str | None,
cur_recipient: str | None,
prev_recipient: str | None,
delta_text: str,
include_reasoning: bool,
) -> tuple[DeltaMessage | None, bool]
Extract a DeltaMessage from harmony parser state during streaming.
Parameters:
| Name | Type | Description | Default |
harmony_parser | StreamableParser | The StreamableParser instance tracking parse state | required |
cur_channel | str | None | Current channel ("final", "analysis", "commentary", etc.) | required |
cur_recipient | str | None | Current recipient (e.g., "functions.my_func") | required |
prev_recipient | str | None | Previous recipient for detecting tool call transitions | required |
delta_text | str | The text delta to include in the message | required |
include_reasoning | bool | Whether to include reasoning content | required |
Returns:
Source code in vllm/entrypoints/openai/chat_completion/stream_harmony.py
| def extract_harmony_streaming_delta(
harmony_parser: StreamableParser,
cur_channel: str | None,
cur_recipient: str | None,
prev_recipient: str | None,
delta_text: str,
include_reasoning: bool,
) -> tuple[DeltaMessage | None, bool]:
"""
Extract a DeltaMessage from harmony parser state during streaming.
Args:
harmony_parser: The StreamableParser instance tracking parse state
cur_channel: Current channel ("final", "analysis", "commentary", etc.)
cur_recipient: Current recipient (e.g., "functions.my_func")
prev_recipient: Previous recipient for detecting tool call transitions
delta_text: The text delta to include in the message
include_reasoning: Whether to include reasoning content
Returns:
A tuple of (DeltaMessage or None, tools_streamed_flag)
"""
tools_streamed = False
if cur_channel == "final":
delta_message = DeltaMessage(content=delta_text)
elif (
(cur_channel == "commentary" or cur_channel == "analysis")
and cur_recipient
and cur_recipient.startswith("functions.")
):
# Count completed tool calls to determine index
base_index = 0
for msg in harmony_parser.messages:
if (
(msg.channel == "commentary" or msg.channel == "analysis")
and msg.recipient
and msg.recipient.startswith("functions.")
):
base_index += 1
if prev_recipient != cur_recipient:
tool_name = cur_recipient.split("functions.", 1)[1]
delta_message = DeltaMessage(
tool_calls=[
DeltaToolCall(
id=make_tool_call_id(),
type="function",
function=DeltaFunctionCall(
name=tool_name,
arguments="",
),
index=base_index,
)
]
)
elif delta_text:
delta_message = DeltaMessage(
tool_calls=[
DeltaToolCall(
index=base_index,
function=DeltaFunctionCall(arguments=delta_text),
)
]
)
else:
delta_message = None
if delta_message is not None:
tools_streamed = True
elif cur_channel == "commentary":
# Tool call preambles meant to be shown to the user
delta_message = DeltaMessage(content=delta_text)
elif cur_channel == "analysis":
if include_reasoning:
delta_message = DeltaMessage(reasoning=delta_text)
else:
delta_message = None
else:
delta_message = None
return delta_message, tools_streamed
|