hbllmutils.model.stream

Streaming response handling utilities for session-based model outputs.

This module provides a reusable streaming interface for processing response chunks from a session iterator. It supports optional separation of reasoning content from regular content with configurable splitters and exposes accumulated outputs after streaming completes.

The module contains the following main components:

  • ResponseStream - Abstract stream handler with reasoning/content separation and accumulation

  • OpenAIResponseStream - Concrete implementation for OpenAI-style streaming responses

Example:

>>> import sys
>>> stream = OpenAIResponseStream(session, with_reasoning=True)
>>> for chunk in stream:
...     print(chunk, end='')
...     sys.stdout.flush()
>>> print(f"Reasoning: {stream.reasoning_content}")
>>> print(f"Content: {stream.content}")

Note

The stream can only be iterated once. Attempting to iterate again raises a RuntimeError.

DEFAULT_REASONING_SPLITTER

hbllmutils.model.stream.DEFAULT_REASONING_SPLITTER: str = '---------------------------reasoning---------------------------'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

DEFAULT_CONTENT_SPLITTER

hbllmutils.model.stream.DEFAULT_CONTENT_SPLITTER: str = '---------------------------content---------------------------'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

ResponseStream

class hbllmutils.model.stream.ResponseStream(session: Any, with_reasoning: bool = False, reasoning_splitter: str = '---------------------------reasoning---------------------------', content_splitter: str = '---------------------------content---------------------------')[source]

A stream handler for processing session responses with optional reasoning content separation.

This class wraps a session object and provides an iterator interface to stream response chunks, optionally separating reasoning content from regular content with configurable splitters.

The stream maintains internal state to track iteration progress and accumulates both reasoning and regular content for post-iteration access through properties.

Parameters:
  • session (Any) – The session object to stream responses from. Must support iteration and yield chunks with choices[0].delta attributes.

  • with_reasoning (bool) – Whether to include reasoning content in the stream output, defaults to False. When True, reasoning content will be prefixed with the reasoning_splitter.

  • reasoning_splitter (str) – The separator string for reasoning content sections, defaults to a dashed line.

  • content_splitter (str) – The separator string for regular content sections, defaults to a dashed line.

Variables:
  • session (Any) – The underlying session iterator providing response chunks.

  • _with_reasoning (bool) – Whether the stream yields reasoning content.

  • _reasoning_splitter (str) – Separator used before reasoning content.

  • _content_splitter (str) – Separator used before regular content.

  • _reasoning_content (Optional[str]) – Accumulated reasoning content after streaming completes.

  • _content (Optional[str]) – Accumulated regular content after streaming completes.

  • _iter_status (str) – Stream status flag: none, entered, or ended.

Example:

>>> stream = ResponseStream(session)
>>> # Stream without reasoning
>>> for chunk in stream:
...     print(chunk, end='')

>>> stream_with_reasoning = ResponseStream(session, with_reasoning=True)
>>> # Stream with reasoning separated by splitters
>>> for chunk in stream_with_reasoning:
...     print(chunk, end='')
__init__(session: Any, with_reasoning: bool = False, reasoning_splitter: str = '---------------------------reasoning---------------------------', content_splitter: str = '---------------------------content---------------------------') None[source]

Initialize the ResponseStream.

Parameters:
  • session (Any) – The session object to stream responses from. Must support iteration and yield chunks with choices[0].delta attributes.

  • with_reasoning (bool) – Whether to include reasoning content in the stream output, defaults to False. When True, reasoning content will be prefixed with the reasoning_splitter.

  • reasoning_splitter (str) – The separator string for reasoning content sections, defaults to a dashed line.

  • content_splitter (str) – The separator string for regular content sections, defaults to a dashed line.

Example:

>>> stream = ResponseStream(session)
>>> # Stream without reasoning
>>> for chunk in stream:
...     print(chunk, end='')

>>> stream_with_reasoning = ResponseStream(session, with_reasoning=True)
>>> # Stream with reasoning separated by splitters
>>> for chunk in stream_with_reasoning:
...     print(chunk, end='')
__iter__() Iterator[str][source]

Iterate over the session responses, yielding content chunks.

This method streams response chunks from the session, separating reasoning content from regular content when applicable. It accumulates both types of content internally for later access via properties.

The iteration process:

  1. Checks if stream has already been used

  2. Iterates through session chunks

  3. Extracts reasoning_content and content from delta objects

  4. Yields content with appropriate splitters when transitioning between content types

  5. Accumulates all content for post-iteration access

Returns:

An iterator yielding string chunks of content.

Return type:

Iterator[str]

Raises:

RuntimeError – If the stream has already been entered or ended.

Example:

>>> import sys
>>> stream = OpenAIResponseStream(session, with_reasoning=True)
>>> for chunk in stream:
...     print(chunk, end='')
...     sys.stdout.flush()  # need to flush it
---------------------------reasoning---------------------------

This is reasoning content...

---------------------------content---------------------------

This is regular content...

>>> # After iteration, access accumulated content
>>> print(stream.reasoning_content)
This is reasoning content...
>>> print(stream.content)
This is regular content...
property content: str | None

Get the accumulated regular content from the stream.

This property is only available after the stream has been fully consumed. The content includes all text that was marked as regular content in the session’s delta objects.

Returns:

The complete regular content, or None if not yet available.

Return type:

Optional[str]

Example:

>>> stream = OpenAIResponseStream(session)
>>> for chunk in stream:
...     pass
>>> regular_content = stream.content
>>> print(regular_content)
This is the regular content...
property is_ended: bool

Check if the stream has ended (iteration completed).

Returns:

True if iteration has completed, False otherwise.

Return type:

bool

Example:

>>> stream = OpenAIResponseStream(session)
>>> stream.is_ended
False
>>> for chunk in stream:
...     pass
>>> stream.is_ended
True
property is_entered: bool

Check if the stream has been entered (iteration started).

Returns:

True if iteration has started, False otherwise.

Return type:

bool

Example:

>>> stream = OpenAIResponseStream(session)
>>> stream.is_entered
False
>>> iter(stream)
>>> stream.is_entered
True
property reasoning_content: str | None

Get the accumulated reasoning content from the stream.

This property is only available after the stream has been fully consumed. The reasoning content includes all text that was marked as reasoning_content in the session’s delta objects.

Returns:

The complete reasoning content, or None if not yet available.

Return type:

Optional[str]

Example:

>>> stream = OpenAIResponseStream(session, with_reasoning=True)
>>> for chunk in stream:
...     pass
>>> reasoning = stream.reasoning_content
>>> print(reasoning)
This is the reasoning content...

OpenAIResponseStream

class hbllmutils.model.stream.OpenAIResponseStream(session: Any, with_reasoning: bool = False, reasoning_splitter: str = '---------------------------reasoning---------------------------', content_splitter: str = '---------------------------content---------------------------')[source]

OpenAI-specific implementation of ResponseStream.

This class provides concrete implementations for extracting reasoning content and regular content from OpenAI API response chunks. It expects chunks to have a structure with choices[0].delta attributes containing reasoning_content and content.

Example:

>>> from openai import OpenAI
>>> client = OpenAI()
>>> response = client.chat.completions.create(
...     model="gpt-4",
...     messages=[{"role": "user", "content": "Hello"}],
...     stream=True
... )
>>> stream = OpenAIResponseStream(response, with_reasoning=True)
>>> for chunk in stream:
...     print(chunk, end='')