Source code for hbllmutils.model.remote

"""
Remote LLM client implementation for OpenAI-compatible APIs.

This module implements a concrete :class:`~hbllmutils.model.base.LLMModel` that
talks to OpenAI-compatible chat completion endpoints. It supports synchronous
requests, streaming responses, and convenient helpers for returning either the
full :class:`openai.types.chat.ChatCompletionMessage` or only the text content.
The implementation is designed to be endpoint-agnostic as long as the server
implements the OpenAI API protocol.

The main public component is:

* :class:`RemoteLLMModel` - A concrete client for OpenAI-compatible API endpoints

Typical usage focuses on building a conversation history list and then calling
:meth:`RemoteLLMModel.ask`, :meth:`RemoteLLMModel.ask_stream`, or
:meth:`RemoteLLMModel.create_message`.

Example::

    >>> from hbllmutils.model.remote import RemoteLLMModel
    >>> model = RemoteLLMModel(
    ...     base_url="https://api.openai.com/v1",
    ...     api_token="sk-xxx",
    ...     model_name="gpt-3.5-turbo",
    ...     max_tokens=128
    ... )
    >>> messages = [{"role": "user", "content": "Hello, world!"}]
    >>> print(model.ask(messages))
    Hello! How can I help you today?

    >>> # Streaming usage
    >>> stream = model.ask_stream(messages)
    >>> for chunk in stream:
    ...     print(chunk, end="", flush=True)

.. note::
   The class relies on the official ``openai`` package and expects OpenAI-style
   responses, including streaming deltas. If your endpoint diverges from this
   structure, consider implementing a custom response stream handler.

"""

from typing import Dict, Optional, Union, Any, List, Tuple
from urllib.parse import urlparse

from openai import OpenAI, AsyncOpenAI
from openai.types.chat import ChatCompletionMessage

from .base import LLMModel
from .stream import OpenAIResponseStream, ResponseStream
from ..utils import log_pformat


[docs] class RemoteLLMModel(LLMModel): """ A client for interacting with remote Large Language Model APIs. This class provides a unified interface for communicating with OpenAI-compatible API endpoints. It supports both synchronous and asynchronous operations, streaming responses, and allows customization of request parameters. :param base_url: API base URL (e.g., ``"https://api.openai.com/v1"``) :type base_url: str :param api_token: API access token for authentication :type api_token: str :param model_name: Name of the model to use (e.g., ``"gpt-3.5-turbo"``) :type model_name: str :param organization_id: Organization ID (optional, required by some APIs) :type organization_id: Optional[str] :param timeout: Request timeout in seconds, defaults to ``30`` :type timeout: int :param max_retries: Maximum number of retry attempts, defaults to ``3`` :type max_retries: int :param headers: Custom request headers :type headers: Optional[Dict[str, str]] :param default_params: Default parameters for API requests :ivar base_url: API base URL (e.g., ``"https://api.openai.com/v1"``) :vartype base_url: str :ivar api_token: API access token for authentication :vartype api_token: str :ivar model_name: Name of the model to use (e.g., ``"gpt-3.5-turbo"``, ``"claude-3-opus"``) :vartype model_name: str :ivar organization_id: Organization ID (required by some APIs) :vartype organization_id: Optional[str] :ivar timeout: Request timeout in seconds :vartype timeout: int :ivar max_retries: Maximum number of retry attempts :vartype max_retries: int :ivar headers: Custom request headers :vartype headers: Dict[str, str] :ivar default_params: Default parameters for API requests :vartype default_params: Dict[str, Any] :raises ValueError: If ``base_url`` format is invalid :raises ValueError: If ``api_token`` is empty :raises ValueError: If ``model_name`` is empty :raises ValueError: If ``timeout`` is not positive :raises ValueError: If ``max_retries`` is negative Example:: >>> model = RemoteLLMModel( ... base_url="https://api.openai.com/v1", ... api_token="sk-xxx", ... model_name="gpt-3.5-turbo", ... temperature=0.2 ... ) >>> messages = [{"role": "user", "content": "Summarize LLMs in one sentence."}] >>> print(model.ask(messages)) Large language models generate text by predicting tokens from vast training data. """
[docs] def __init__( self, base_url: str, api_token: str, model_name: str, organization_id: Optional[str] = None, timeout: int = 30, max_retries: int = 3, headers: Optional[Dict[str, str]] = None, **default_params: Any, ): """ Initialize the RemoteLLMModel instance. :param base_url: API base URL (e.g., ``"https://api.openai.com/v1"``) :type base_url: str :param api_token: API access token for authentication :type api_token: str :param model_name: Name of the model to use (e.g., ``"gpt-3.5-turbo"``) :type model_name: str :param organization_id: Organization ID (optional, required by some APIs) :type organization_id: Optional[str] :param timeout: Request timeout in seconds (default: ``30``) :type timeout: int :param max_retries: Maximum number of retry attempts (default: ``3``) :type max_retries: int :param headers: Custom request headers (optional) :type headers: Optional[Dict[str, str]] :param default_params: Default parameters for API requests (optional) :type default_params: Any :raises ValueError: If base_url format is invalid :raises ValueError: If api_token is empty :raises ValueError: If model_name is empty :raises ValueError: If timeout is not positive :raises ValueError: If max_retries is negative Example:: >>> model = RemoteLLMModel( ... base_url="https://api.openai.com/v1", ... api_token="sk-xxx", ... model_name="gpt-3.5-turbo" ... ) """ self.base_url = base_url # Validate URL format try: result = urlparse(self.base_url) if not all([result.scheme, result.netloc]): raise ValueError(f"Invalid base_url format - {self.base_url!r}") except Exception as e: raise ValueError(f"Invalid base_url - {self.base_url!r}: {e}") self.api_token = api_token if not self.api_token.strip(): raise ValueError(f"api_token cannot be empty, but {self.api_token!r} found") self.model_name = model_name if not self.model_name.strip(): raise ValueError(f"model_name cannot be empty, but {self.model_name!r} found") self.organization_id = organization_id self.timeout = timeout if self.timeout <= 0: raise ValueError(f"timeout must be positive, but {self.timeout!r} found") self.max_retries = max_retries if self.max_retries < 0: raise ValueError(f"max_retries cannot be negative, but {self.max_retries!r} found") self.headers = dict(headers or {}) self.default_params: Dict[str, Any] = default_params self._client_non_async = None
@property def _logger_name(self) -> str: """ Get the logger name for this model instance. :return: The model name to be used as logger identifier :rtype: str Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="gpt-3.5-turbo") >>> model._logger_name 'gpt-3.5-turbo' """ return self.model_name def _create_openai_client(self, use_async: bool = False) -> Union[OpenAI, AsyncOpenAI]: """ Create an OpenAI client instance (synchronous or asynchronous). :param use_async: Whether to create an asynchronous client (default: ``False``) :type use_async: bool :return: OpenAI client instance (synchronous or asynchronous) :rtype: Union[OpenAI, AsyncOpenAI] Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="...") >>> sync_client = model._create_openai_client(use_async=False) >>> async_client = model._create_openai_client(use_async=True) """ self._logger.debug(f'Remote LLM ({"async" if use_async else "non-async"} client created: {self!r}') return (AsyncOpenAI if use_async else OpenAI)( api_key=self.api_token, base_url=self.base_url, organization=self.organization_id, timeout=self.timeout, max_retries=self.max_retries, default_headers=self.headers ) @property def _client(self) -> OpenAI: """ Get the synchronous OpenAI client instance. Creates a new client on first access and caches it for subsequent calls. :return: Synchronous OpenAI client instance :rtype: OpenAI Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="...") >>> client = model._client """ self._client_non_async = self._client_non_async or self._create_openai_client(use_async=False) return self._client_non_async def _get_non_async_session(self, messages: List[dict], stream: bool = False, **params: Any) -> Any: """ Create a synchronous chat completion session. This is an internal method used by other public methods to create API sessions. :param messages: List of message dictionaries for the conversation :type messages: List[dict] :param stream: Whether to enable streaming mode (default: ``False``) :type stream: bool :param params: Additional parameters to pass to the API :type params: Any :return: Chat completion response or stream :rtype: Any Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="...") >>> messages = [{"role": "user", "content": "Hello"}] >>> session = model._get_non_async_session(messages, stream=False) """ self._logger.info(f'Remote LLM {self.model_name!r} chat created:\n' f'{log_pformat(messages)}') return self._client.chat.completions.create( model=self.model_name, messages=messages, stream=stream, **{ **self.default_params, **params, } )
[docs] def create_message(self, messages: List[dict], **params: Any) -> ChatCompletionMessage: """ Send a chat request and get the complete message response. :param messages: List of message dictionaries for the conversation :type messages: List[dict] :param params: Additional parameters to pass to the API :type params: Any :return: The message object from the first choice in the response :rtype: ChatCompletionMessage Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="...") >>> messages = [{"role": "user", "content": "What is AI?"}] >>> response = model.create_message(messages) >>> print(response.content) """ session = self._get_non_async_session(messages=messages, stream=False, **params) self._logger.info(f'Answer of remote LLM {self.model_name!r}:\n' f'{log_pformat(session.choices[0].message.content)}') return session.choices[0].message
[docs] def ask(self, messages: List[dict], with_reasoning: bool = False, **params: Any) \ -> Union[str, Tuple[Optional[str], str]]: """ Send a chat request and get the text response. :param messages: List of message dictionaries for the conversation :type messages: List[dict] :param with_reasoning: Whether to return reasoning content along with the response (default: ``False``) :type with_reasoning: bool :param params: Additional parameters to pass to the API :type params: Any :return: If ``with_reasoning`` is ``False``, returns the content string. If ``with_reasoning`` is ``True``, returns a tuple of ``(reasoning_content, content)``. :rtype: Union[str, Tuple[Optional[str], str]] Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="...") >>> messages = [{"role": "user", "content": "Explain quantum computing"}] >>> # Get only the response content >>> response = model.ask(messages) >>> print(response) >>> # Get both reasoning and response content >>> reasoning, response = model.ask(messages, with_reasoning=True) >>> print(f"Reasoning: {reasoning}") >>> print(f"Response: {response}") """ message = self.create_message(messages=messages, **params) if with_reasoning: return getattr(message, 'reasoning_content', None), message.content else: return message.content
[docs] def ask_stream(self, messages: List[dict], with_reasoning: bool = False, **params: Any) -> ResponseStream: """ Send a chat request and get a streaming response. :param messages: List of message dictionaries for the conversation :type messages: List[dict] :param with_reasoning: Whether to include reasoning content in the stream (default: ``False``) :type with_reasoning: bool :param params: Additional parameters to pass to the API :type params: Any :return: A ResponseStream object for iterating over the streaming response :rtype: ResponseStream Example:: >>> model = RemoteLLMModel(base_url="...", api_token="...", model_name="...") >>> messages = [{"role": "user", "content": "Write a story"}] >>> stream = model.ask_stream(messages) >>> for chunk in stream: ... print(chunk, end='', flush=True) """ session = self._get_non_async_session(messages=messages, stream=True, **params) self._logger.info(f'Answer of remote LLM {self.model_name!r} streamed.') return OpenAIResponseStream(session, with_reasoning=with_reasoning)
[docs] def __repr__(self) -> str: """ Return a string representation of the RemoteLLMModel instance. All constructor parameters including ``default_params`` are displayed at the same level. The API token is masked for security purposes. :return: String representation of the instance :rtype: str Example:: >>> model = RemoteLLMModel( ... base_url="https://api.openai.com/v1", ... api_token="sk-xxx", ... model_name="gpt-3.5-turbo", ... max_tokens=1000 ... ) >>> repr(model) 'RemoteLLMModel(base_url=..., api_token=..., max_tokens=1000, ...)' """ # Collect all parameters params = { 'base_url': self.base_url, 'api_token': self.api_token, 'model_name': self.model_name, 'organization_id': self.organization_id, 'timeout': self.timeout, 'max_retries': self.max_retries, 'headers': self.headers, # Expand default_params content to the same level **self.default_params } # Build parameter string list param_strings = [] for key, value in params.items(): if key == 'api_token' and value: # Mask API token for security if len(value) > 12: masked_value = f"'{value[:6]}***{value[-6:]}'" elif len(value) > 8: masked_value = f"'{value[:4]}***{value[-4:]}'" else: masked_value = "'***'" param_strings.append(f"{key}={masked_value}") else: param_strings.append(f"{key}={value!r}") params_str = ', '.join(param_strings) return f"{self.__class__.__name__}({params_str})"
def _params(self) -> Tuple[Any, ...]: """ Get the parameters that define this model instance. This method returns a stable and hashable representation of the model's parameters including all constructor arguments. It is used for equality comparison and hashing of model instances. :return: A hashable tuple representation of the model's parameters. :rtype: tuple Example:: >>> model = RemoteLLMModel( ... base_url="https://api.openai.com/v1", ... api_token="sk-xxx", ... model_name="gpt-3.5-turbo", ... max_tokens=1000 ... ) >>> params = model._params() >>> isinstance(params, tuple) True """ # Convert headers dict to sorted tuple of tuples for hashability headers_tuple = tuple(sorted(self.headers.items())) if self.headers else () # Convert default_params dict to sorted tuple of tuples for hashability default_params_tuple = tuple(sorted(self.default_params.items())) if self.default_params else () return ( self.base_url, self.api_token, self.model_name, self.organization_id, self.timeout, self.max_retries, headers_tuple, default_params_tuple )