Skip to content

Core API Reference

Agent

lllm.core.agent.Agent dataclass

Source code in lllm/core/agent.py
@dataclass
class Agent:
    name: str # the role of the agent, or a name of the agent
    system_prompt: Prompt
    model: str # the latest snapshot of a model
    llm_provider: BaseProvider
    log_base: ReplayableLogBase   
    api_type: APITypes = APITypes.COMPLETION
    model_args: Dict[str, Any] = field(default_factory=dict) # additional args, like temperature, seed, etc.
    max_exception_retry: int = 3
    max_interrupt_times: int = 5
    max_llm_recall: int = 0

    """
    Represents a single LLM agent with a specific role and capabilities.

    Attributes:
        name (str): The name or role of the agent (e.g., 'assistant', 'coder').
        system_prompt (Prompt): The system prompt defining the agent's persona.
        model (str): The model identifier (e.g., 'gpt-4o').
        llm_provider (BaseProvider): The provider instance for LLM calls.
        log_base (ReplayableLogBase): Logger for recording interactions.
        model_args (Dict[str, Any]): Additional model arguments (temp, top_p, etc.).
        max_exception_retry (int): Max retries for agent exceptions.
        max_interrupt_times (int): Max consecutive tool call interrupts.
        max_llm_recall (int): Max retries for LLM API errors.
    """

    def __post_init__(self):
        self.model_card = find_model_card(self.model)
        self.model_card.check_args(self.model_args)
        self.model = self.model_card.latest_snapshot.name

    def reload_system(self, system_prompt: Prompt):
        self.system_prompt = system_prompt

    # initialize the dialog
    def init_dialog(self, prompt_args: Optional[Dict[str, Any]] = None, session_name: str = None) -> Dialog:
        prompt_args = dict(prompt_args) if prompt_args else {}
        if session_name is None:
            session_name = dt.datetime.now().strftime('%Y%m%d_%H%M%S')+'_'+str(uuid.uuid4())[:6]
        system_message = Message(
            role=Roles.SYSTEM,
            content=self.system_prompt(**prompt_args),
            creator='system',
        )
        return Dialog(
            [system_message],
            self.log_base,
            session_name,
            top_prompt=self.system_prompt,
        )

    # send a message to the dialog manually
    def send_message(
        self,
        dialog: Dialog,
        prompt: Prompt,
        prompt_args: Optional[Dict[str, Any]] = None,
        creator: str = 'internal',
        extra: Optional[Dict[str, Any]] = None,
        role: Roles = Roles.USER,
    ):
        prompt_payload = dict(prompt_args) if prompt_args else None
        extra_payload = dict(extra) if extra else None
        return dialog.send_message(prompt, prompt_payload, creator=creator, extra=extra_payload, role=role)

    # it performs the "Agent Call"
    def call(
        self,
        dialog: Dialog,  # it assumes the prompt is already loaded into the dialog as the top prompt by send_message
        extra: Optional[Dict[str, Any]] = None,  # for tracking additional information, such as frontend replay info
        args: Optional[Dict[str, Any]] = None,  # for tracking additional information, such as frontend replay info
        parser_args: Optional[Dict[str, Any]] = None,
    ) -> Tuple[Message, Dialog, List[FunctionCall]]:
        """
        Executes the agent loop, handling LLM calls, tool execution, and interrupts.

        Args:
            dialog (Dialog): The current dialog state.
            extra (Dict[str, Any], optional): Extra metadata for the call.
            args (Dict[str, Any], optional): Additional arguments for the prompt.
            parser_args (Dict[str, Any], optional): Arguments for the output parser.

        Returns:
            Tuple[Message, Dialog, List[FunctionCall]]: The final response message, the updated dialog, and a list of executed function calls.

        Raises:
            ValueError: If the agent fails to produce a valid response after retries.
        """
        extra = dict(extra) if extra else {}
        args = dict(args) if args else {}
        parser_args = dict(parser_args) if parser_args else {}
        # Prompt: a function maps prompt args and dialog into the expected output 
        current_prompt = dialog.top_prompt or self.system_prompt
        interrupts = []
        for i in range(10000 if self.max_interrupt_times == 0 else self.max_interrupt_times+1): # +1 for the final response
            llm_recall = self.max_llm_recall 
            exception_retry = self.max_exception_retry 
            working_dialog = dialog.fork() # make a copy of the dialog, truncate all excception handling dialogs
            while True: # ensure the response is no exception
                execution_attempts = []
                try:
                    _model_args = self.model_args.copy()
                    _model_args.update(args)

                    response = self.llm_provider.call(
                        working_dialog,
                        current_prompt,
                        self.model,
                        _model_args,
                        parser_args=parser_args,
                        responder=self.name,
                        extra=extra,
                        api_type=self.api_type,
                    )
                    working_dialog.append(response) 
                    if response.execution_errors != []:
                        execution_attempts.append(response)
                        raise AgentException(response.error_message)
                    else: 
                        break
                except AgentException as e: # handle the exception from the agent
                    if exception_retry > 0:
                        exception_retry -= 1
                        U.cprint(f'{self.name} is handling an exception {e}, retry times: {self.max_exception_retry-exception_retry}/{self.max_exception_retry}','r')
                        working_dialog.send_message(current_prompt.exception_handler, {'error_message': str(e)}, creator='exception')
                        current_prompt = dialog.top_prompt
                        continue
                    else:
                        raise e
                except Exception as e: # handle the exception from the LLM
                    # Simplified error handling for now
                    wait_time = random.random()*15+1
                    if U.is_openai_rate_limit_error(e): # for safe
                        time.sleep(wait_time)
                    else:
                        if llm_recall > 0:
                            llm_recall -= 1
                            time.sleep(1) # wait for a while before retrying
                            continue
                        else:
                            raise e

            response.execution_attempts = execution_attempts
            dialog.append(response) # update the dialog state
            # now handle the interruption
            if response.is_function_call:
                _func_names = [func_call.name for func_call in response.function_calls]
                U.cprint(f'{self.name} is calling function {_func_names}, interrupt times: {i+1}/{self.max_interrupt_times}','y')
                # handle the function call
                for function_call in response.function_calls:
                    if function_call.is_repeated(interrupts):
                        result_str = f'The function {function_call.name} with identical arguments {function_call.arguments} has been called earlier, please check the previous results and do not call it again. If you do not need to call more functions, just stop calling and provide the final response.'
                    else:
                        print(f'{self.name} is calling function {function_call.name} with arguments {function_call.arguments}')
                        if function_call.name not in current_prompt.functions:
                            raise KeyError(f"Function '{function_call.name}' not registered on prompt '{current_prompt.path}'")
                        function = current_prompt.functions[function_call.name]
                        function_call = function(function_call)
                        result_str = function_call.result_str
                        interrupts.append(function_call)
                    if response.api_type == APITypes.RESPONSE:
                        interrupt_role = Roles.USER
                    else:
                        interrupt_role = Roles.TOOL
                    dialog.send_message(
                        current_prompt.interrupt_handler,
                        {'call_results': result_str},
                        role=interrupt_role,
                        creator='function',
                        extra={'tool_call_id': function_call.id},
                    )
                if i == self.max_interrupt_times-1:
                    dialog.send_message(current_prompt.interrupt_handler_final, role=Roles.USER, creator='function')
                current_prompt = dialog.top_prompt
            else: # the response is not a function call, it is the final response
                if i > 0:   
                    U.cprint(f'{self.name} stopped calling functions, total interrupt times: {i}/{self.max_interrupt_times}','y')
                return response, dialog, interrupts
        raise ValueError('Failed to call the agent')

    # a special agent call for classification
    def _classify(self, dialog: Dialog, classes: List[str], classifier_args: Dict[str, Any]):
        _, dialog, _ = self.call(dialog, args=classifier_args)
        response = dialog.tail.raw_response
        choice = response.choices[0]
        logprobs = choice.logprobs.content
        if not len(logprobs) == 1:
            raise ClassificationError(f'Failed to classify the proposition, not only one token ({len(logprobs)})', {})
        chosen_token_data = choice.logprobs.content[0]
        top_probs = {}
        for top_logprob_entry in chosen_token_data.top_logprobs:
            token = top_logprob_entry.token
            prob = np.exp(top_logprob_entry.logprob)
            top_probs[token] = prob
        U.cprint(top_probs, 'y')
        errors = []
        for token in classes:
            if token not in top_probs:
                errors.append(f"Token {token} not found in the top logprobs")
        if errors != []:
            raise ClassificationError(f'Failed to classify the proposition:\n{"\n".join(errors)}', {})
        return top_probs, dialog

    def classify(self, dialog: Dialog, classes: List[str], classifier_prompt: str = None, strength: int = 10):
        # binary classification by default
        _classifier_args = self.model_card.make_classifier(classes, strength)
        if classifier_prompt is None:
            _classes = ' or '.join([f'"{t}"' for t in classes])
            classifier_prompt = f"Please respond with one and only one word from {_classes}."
        dialog.send_message(classifier_prompt)
        _dialog = dialog.fork()
        llm_recall = self.max_llm_recall 
        exception_retry = self.max_exception_retry 
        while True:
            try:
                top_probs, _dialog = self._classify(_dialog, classes, _classifier_args)
                dialog.append(_dialog.tail) # truncate the error handlings
                return top_probs, dialog
            except ClassificationError as e:
                if exception_retry > 0:
                    _dialog.send_message(f'Please respond with one and only one word from {classes}')
                    exception_retry -= 1
                    print(f'{e}\nRetrying... times: {self.max_exception_retry-exception_retry}/{self.max_exception_retry}')
                else:
                    raise e
            except Exception as e:
                if llm_recall > 0:
                    llm_recall -= 1
                    time.sleep(1)
                    continue
                else:
                    raise e

max_llm_recall = 0 class-attribute instance-attribute

Represents a single LLM agent with a specific role and capabilities.

Attributes:

Name Type Description
name str

The name or role of the agent (e.g., 'assistant', 'coder').

system_prompt Prompt

The system prompt defining the agent's persona.

model str

The model identifier (e.g., 'gpt-4o').

llm_provider BaseProvider

The provider instance for LLM calls.

log_base ReplayableLogBase

Logger for recording interactions.

model_args Dict[str, Any]

Additional model arguments (temp, top_p, etc.).

max_exception_retry int

Max retries for agent exceptions.

max_interrupt_times int

Max consecutive tool call interrupts.

max_llm_recall int

Max retries for LLM API errors.

call(dialog, extra=None, args=None, parser_args=None)

Executes the agent loop, handling LLM calls, tool execution, and interrupts.

Parameters:

Name Type Description Default
dialog Dialog

The current dialog state.

required
extra Dict[str, Any]

Extra metadata for the call.

None
args Dict[str, Any]

Additional arguments for the prompt.

None
parser_args Dict[str, Any]

Arguments for the output parser.

None

Returns:

Type Description
Tuple[Message, Dialog, List[FunctionCall]]

Tuple[Message, Dialog, List[FunctionCall]]: The final response message, the updated dialog, and a list of executed function calls.

Raises:

Type Description
ValueError

If the agent fails to produce a valid response after retries.

Source code in lllm/core/agent.py
def call(
    self,
    dialog: Dialog,  # it assumes the prompt is already loaded into the dialog as the top prompt by send_message
    extra: Optional[Dict[str, Any]] = None,  # for tracking additional information, such as frontend replay info
    args: Optional[Dict[str, Any]] = None,  # for tracking additional information, such as frontend replay info
    parser_args: Optional[Dict[str, Any]] = None,
) -> Tuple[Message, Dialog, List[FunctionCall]]:
    """
    Executes the agent loop, handling LLM calls, tool execution, and interrupts.

    Args:
        dialog (Dialog): The current dialog state.
        extra (Dict[str, Any], optional): Extra metadata for the call.
        args (Dict[str, Any], optional): Additional arguments for the prompt.
        parser_args (Dict[str, Any], optional): Arguments for the output parser.

    Returns:
        Tuple[Message, Dialog, List[FunctionCall]]: The final response message, the updated dialog, and a list of executed function calls.

    Raises:
        ValueError: If the agent fails to produce a valid response after retries.
    """
    extra = dict(extra) if extra else {}
    args = dict(args) if args else {}
    parser_args = dict(parser_args) if parser_args else {}
    # Prompt: a function maps prompt args and dialog into the expected output 
    current_prompt = dialog.top_prompt or self.system_prompt
    interrupts = []
    for i in range(10000 if self.max_interrupt_times == 0 else self.max_interrupt_times+1): # +1 for the final response
        llm_recall = self.max_llm_recall 
        exception_retry = self.max_exception_retry 
        working_dialog = dialog.fork() # make a copy of the dialog, truncate all excception handling dialogs
        while True: # ensure the response is no exception
            execution_attempts = []
            try:
                _model_args = self.model_args.copy()
                _model_args.update(args)

                response = self.llm_provider.call(
                    working_dialog,
                    current_prompt,
                    self.model,
                    _model_args,
                    parser_args=parser_args,
                    responder=self.name,
                    extra=extra,
                    api_type=self.api_type,
                )
                working_dialog.append(response) 
                if response.execution_errors != []:
                    execution_attempts.append(response)
                    raise AgentException(response.error_message)
                else: 
                    break
            except AgentException as e: # handle the exception from the agent
                if exception_retry > 0:
                    exception_retry -= 1
                    U.cprint(f'{self.name} is handling an exception {e}, retry times: {self.max_exception_retry-exception_retry}/{self.max_exception_retry}','r')
                    working_dialog.send_message(current_prompt.exception_handler, {'error_message': str(e)}, creator='exception')
                    current_prompt = dialog.top_prompt
                    continue
                else:
                    raise e
            except Exception as e: # handle the exception from the LLM
                # Simplified error handling for now
                wait_time = random.random()*15+1
                if U.is_openai_rate_limit_error(e): # for safe
                    time.sleep(wait_time)
                else:
                    if llm_recall > 0:
                        llm_recall -= 1
                        time.sleep(1) # wait for a while before retrying
                        continue
                    else:
                        raise e

        response.execution_attempts = execution_attempts
        dialog.append(response) # update the dialog state
        # now handle the interruption
        if response.is_function_call:
            _func_names = [func_call.name for func_call in response.function_calls]
            U.cprint(f'{self.name} is calling function {_func_names}, interrupt times: {i+1}/{self.max_interrupt_times}','y')
            # handle the function call
            for function_call in response.function_calls:
                if function_call.is_repeated(interrupts):
                    result_str = f'The function {function_call.name} with identical arguments {function_call.arguments} has been called earlier, please check the previous results and do not call it again. If you do not need to call more functions, just stop calling and provide the final response.'
                else:
                    print(f'{self.name} is calling function {function_call.name} with arguments {function_call.arguments}')
                    if function_call.name not in current_prompt.functions:
                        raise KeyError(f"Function '{function_call.name}' not registered on prompt '{current_prompt.path}'")
                    function = current_prompt.functions[function_call.name]
                    function_call = function(function_call)
                    result_str = function_call.result_str
                    interrupts.append(function_call)
                if response.api_type == APITypes.RESPONSE:
                    interrupt_role = Roles.USER
                else:
                    interrupt_role = Roles.TOOL
                dialog.send_message(
                    current_prompt.interrupt_handler,
                    {'call_results': result_str},
                    role=interrupt_role,
                    creator='function',
                    extra={'tool_call_id': function_call.id},
                )
            if i == self.max_interrupt_times-1:
                dialog.send_message(current_prompt.interrupt_handler_final, role=Roles.USER, creator='function')
            current_prompt = dialog.top_prompt
        else: # the response is not a function call, it is the final response
            if i > 0:   
                U.cprint(f'{self.name} stopped calling functions, total interrupt times: {i}/{self.max_interrupt_times}','y')
            return response, dialog, interrupts
    raise ValueError('Failed to call the agent')

AgentBase

lllm.core.agent.AgentBase

Source code in lllm/core/agent.py
class AgentBase:
    agent_type: str | Enum = None
    agent_group: List[str] = None
    is_async: bool = False

    def __init_subclass__(cls, register: bool = True, **kwargs):
        super().__init_subclass__(**kwargs)
        if register:
            register_agent_class(cls)

    def __init__(self, config: Dict[str, Any], ckpt_dir: str, stream = None):
        auto_discover_if_enabled(config.get("auto_discover"))
        if stream is None:
            stream = U.PrintSystem()
        self.config = config
        assert self.agent_group is not None, f"Agent group is not set for {self.agent_type}"
        _agent_configs = config['agent_configs']
        self.agent_configs = {}
        for agent_name in self.agent_group:
            assert agent_name in _agent_configs, f"Agent {agent_name} not found in agent configs"
            self.agent_configs[agent_name] = _agent_configs[agent_name]
        self._stream = stream
        self._stream_backup = stream
        self.st = None
        self.ckpt_dir = ckpt_dir
        self._log_base = build_log_base(config)
        self.agents = {}

        # Initialize Provider via registry
        self.llm_provider = build_provider(config)

        for agent_name, model_config in self.agent_configs.items():
            model_config = model_config.copy()
            model_name = model_config.pop('model_name')
            self.model = find_model_card(model_name)
            system_prompt_path = model_config.pop('system_prompt_path')
            api_type_value = model_config.pop('api_type', APITypes.COMPLETION.value)
            if isinstance(api_type_value, APITypes):
                api_type = api_type_value
            else:
                api_type = APITypes(api_type_value)

            # We assume PROMPT_REGISTRY is available globally or passed. 
            # This is a bit of a dependency issue. 
            # Ideally, prompts should be loaded/registered before Agent initialization.
            # For now, we'll assume the user registers prompts before creating agents.
            from lllm.core.models import PROMPT_REGISTRY

            self.agents[agent_name] = Agent(
                name=agent_name,
                system_prompt=PROMPT_REGISTRY[system_prompt_path],
                model=model_name,
                llm_provider=self.llm_provider,
                api_type=api_type,
                model_args=model_config,
                log_base=self._log_base,
                max_exception_retry=self.config.get('max_exception_retry', 3),
                max_interrupt_times=self.config.get('max_interrupt_times', 5),
                max_llm_recall=self.config.get('max_llm_recall', 0),
            )

        self.__additional_args = {}
        sig = inspect.signature(self.call)
        for arg in sig.parameters:
            if arg not in {'task', '**kwargs'}:
                self.__additional_args[arg] = sig.parameters[arg].default

    def set_st(self, session_name: str):
        self.st = U.StreamWrapper(self._stream, self._log_base, session_name)

    def restore_st(self):
        pass

    def silent(self):
        self._stream = U.PrintSystem(silent=True)

    def restore(self):
        self._stream = self._stream_backup

    def call(self, task: str, **kwargs):
        raise NotImplementedError

    def __call__(self, task: str, session_name: str = None, **kwargs) -> str:
        if session_name is None:
            session_name = task.replace(' ', '+')+'_'+dt.datetime.now().strftime('%Y%m%d_%H%M%S')
        self.set_st(session_name)
        report = self.call(task, **kwargs)
        with self.st.expander('Prediction Overview', expanded=True):
            self.st.code(f'{report}')
        self.restore_st()
        return report

Dialog

lllm.core.dialog.Dialog dataclass

Whenever a dialog is created/forked, it should be associated with a session name

Source code in lllm/core/dialog.py
@dataclass
class Dialog:
    """
    Whenever a dialog is created/forked, it should be associated with a session name
    """
    _messages: List[Message]
    log_base: ReplayableLogBase
    session_name: str
    parent_dialog: Optional[str] = None
    top_prompt: Optional[Prompt] = None

    def __post_init__(self):
        self.dialog_id = uuid.uuid4().hex
        dialogs_sess = self.log_base.get_collection(RCollections.DIALOGS).create_session(self.session_name) # track the dialogs created in this session
        dialogs_sess.log(self.dialog_id, metadata={'parent_dialog': self.parent_dialog})
        self.sess = self.log_base.get_collection(RCollections.MESSAGES).create_session(f'{self.session_name}/{self.dialog_id}') # track the dialogs created in this session

    def append(self, message: Message): # ensure this is the only way to write the messages to make sure the trackability
        message.extra['dialog_id'] = self.dialog_id
        self._messages.append(message)
        try:
            self.sess.log(message.content, metadata=message.to_dict()) # Use to_dict for logging
        except Exception as e:
            print(f'WARNING: Failed to log message: {e}, log the message without metadata')
            self.sess.log(message.content)


    def to_dict(self):
        return {
            'messages': [message.to_dict() for message in self._messages],
            'session_name': self.session_name,
            'parent_dialog': self.parent_dialog,
            'top_prompt_path': self.top_prompt.path if self.top_prompt is not None else None,
        }

    @classmethod
    def from_dict(cls, d: dict, log_base: ReplayableLogBase, prompt_registry: Dict[str, Prompt]):
        top_prompt_path = d['top_prompt_path']
        if top_prompt_path is not None:
            # Assuming PROMPT_REGISTRY is available or passed. 
            # For now, we rely on the passed prompt_registry.
            top_prompt = prompt_registry.get(top_prompt_path)
            if top_prompt is None:
                 print(f"Warning: Prompt {top_prompt_path} not found in registry.")
        else:
            top_prompt = None
        return cls(
            _messages=[Message.from_dict(message) for message in d['messages']],
            log_base=log_base,
            session_name=d['session_name'],
            parent_dialog=d['parent_dialog'],
            top_prompt=top_prompt,
        )

    @property
    def messages(self):
        return self._messages

    def send_base64_image(
        self,
        image_base64: str,
        caption: str = None,
        creator: str = 'user',
        extra: Optional[Dict[str, Any]] = None,
        role: Roles = Roles.USER,
    ) -> Message:
        payload = dict(extra) if extra else {}
        if caption is not None:
            payload['caption'] = caption
        message = Message(
            role=role,
            content=image_base64,
            creator=creator,
            modality=Modalities.IMAGE,
            extra=payload,
        )
        self.append(message)
        return message

    def send_message(
        self,
        prompt: Prompt | str,
        prompt_args: Optional[Dict[str, Any]] = None,
        creator: str = 'user',  # or 'user', etc.
        extra: Optional[Dict[str, Any]] = None,
        role: Roles = Roles.USER,
    ) -> Message:
        prompt_args = dict(prompt_args) if prompt_args else {}
        metadata = dict(extra) if extra else {}
        if isinstance(prompt, str):
            assert not prompt_args, "Prompt args are not allowed for string prompt"
            # Create a temporary prompt object
            prompt = Prompt(path='__temp_prompt_'+str(uuid.uuid4())[:6], prompt=prompt)
            content = prompt.prompt
        elif not prompt_args:
            content = prompt.prompt
        else:
            content = prompt(**prompt_args)
        message = Message(
            role=role,
            content=content,
            creator=creator,
            modality=Modalities.TEXT,
            extra=metadata
        )
        self.append(message)
        self.top_prompt = prompt
        return message

    def fork(self) -> 'Dialog':
        _messages = [copy.deepcopy(message) for message in self._messages]
        _dialog = Dialog(_messages, self.log_base, self.session_name, self.dialog_id)
        _dialog.top_prompt = self.top_prompt
        return _dialog

    def overview(self, remove_tail: bool = False, max_length: int = 100, 
                 stream = None, divider: bool = False):
        _overview = ''
        for idx, message in enumerate(self.messages):
            if remove_tail and idx == len(self.messages)-1:
                break
            # message.overview() logic needs to be in Message class or here
            # Message class has overview method in original code, I should add it back to Message model if I missed it
            # I missed it in Message model. I'll implement a simple one here or add it to Message.
            # Let's assume Message has it or I implement it here.
            # Implementing here for safety if I missed it in Pydantic model.
            content_preview = str(message.content)[:max_length] + '...' if len(str(message.content)) > max_length else str(message.content)
            _overview += f'[{idx}. {message.creator} ({message.role.value})]: {content_preview}\n\n'

        _overview = _overview.strip()
        cost = self.tail.cost if self.messages else CompletionCost()
        if stream is not None:
            if divider:
                stream.divider()
            stream.write(U.html_collapse(f'Context overview', _overview), unsafe_allow_html=True)
            stream.write(str(cost))
        return _overview

    @property
    def tail(self): # last message in the dialog, use it to get last response from the LLM
        return self._messages[-1] if self._messages else None

    @property
    def system(self):
        return self._messages[0] if self._messages else None

    def context_copy(self, n: int = 1) -> 'Dialog':
        _dialog = self.fork()
        if n > 0:
            _dialog._messages = _dialog._messages[:-n]
        return _dialog

    @property
    def cost(self) -> CompletionCost:
        return self.get_cost()

    def get_cost(self, model: str = None) -> CompletionCost:
        costs = [message.cost for message in self._messages]
        return CompletionCost(
            prompt_tokens=sum([cost.prompt_tokens for cost in costs]),
            completion_tokens=sum([cost.completion_tokens for cost in costs]),
            cached_prompt_tokens=sum([cost.cached_prompt_tokens for cost in costs]),
            cost=sum([cost.cost for cost in costs])
        )

Models

lllm.core.models.Message

Bases: BaseModel

Source code in lllm/core/models.py
class Message(BaseModel):
    role: Roles
    content: Union[str, List[Dict[str, Any]]] # Content can be string or list of content parts (for images)
    creator: str
    raw_response: Any = None
    function_calls: List[FunctionCall] = Field(default_factory=list)
    modality: Modalities = Modalities.TEXT
    logprobs: List[TokenLogprob] = Field(default_factory=list)
    parsed: Dict[str, Any] = Field(default_factory=dict)
    model: Optional[str] = None
    usage: Dict[str, float] = Field(default_factory=dict)
    model_args: Dict[str, Any] = Field(default_factory=dict)
    extra: Dict[str, Any] = Field(default_factory=dict)
    execution_errors: List[Exception] = Field(default_factory=list)
    execution_attempts: List['Message'] = Field(default_factory=list)
    api_type: APITypes = APITypes.COMPLETION

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @field_validator("logprobs", mode="before")
    @classmethod
    def _coerce_logprobs(cls, value):
        if not value:
            return []
        normalized: List[TokenLogprob] = []
        for entry in value:
            if isinstance(entry, TokenLogprob):
                normalized.append(entry)
                continue
            if isinstance(entry, dict):
                normalized.append(TokenLogprob(**entry))
                continue
            if isinstance(entry, (int, float)):
                normalized.append(TokenLogprob(logprob=float(entry)))
                continue
            normalized.append(TokenLogprob(token=str(entry)))
        return normalized

    @property
    def error_message(self):
        return '\n'.join([str(e) for e in self.execution_errors])

    @property
    def cost(self) -> CompletionCost:
        if self.model is None or not self.usage:
            return CompletionCost()
        try:
            card = find_model_card(self.model)
        except Exception:
            return CompletionCost()
        return card.cost(self.usage)

    @property
    def is_function_call(self) -> bool:
        return len(self.function_calls) > 0

    def to_dict(self):
        return self.model_dump(exclude={'raw_response', 'execution_errors', 'execution_attempts'})

    @classmethod
    def from_dict(cls, d: dict):
        return cls(**d)

lllm.core.models.Prompt

Bases: BaseModel

Source code in lllm/core/models.py
class Prompt(BaseModel):
    path: str
    prompt: str
    functions_list: List[Function] = Field(default_factory=list)
    mcp_servers_list: List[MCP] = Field(default_factory=list)
    parser: Optional[Callable[[str], Dict[str, Any]]] = None
    exception_prompt: str = "Error: {error_message}. Please fix."
    interrupt_prompt: str = "Result: {call_results}. Continue?"
    interrupt_final_prompt: str = "All tool calls are done. Provide the final response."
    format: Optional[Any] = None # Pydantic model class for structured output
    xml_tags: List[str] = Field(default_factory=list)
    md_tags: List[str] = Field(default_factory=list)
    signal_tags: List[str] = Field(default_factory=list)
    required_xml_tags: List[str] = Field(default_factory=list)
    required_md_tags: List[str] = Field(default_factory=list)
    allow_web_search: bool = False
    computer_use_config: Dict[str, Any] = Field(default_factory=dict)

    functions: Dict[str, Function] = Field(default_factory=dict, init=False)
    mcp_servers: Dict[str, MCP] = Field(default_factory=dict, init=False)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    def model_post_init(self, __context):
        self.functions = {f.name: f for f in self.functions_list}
        self.mcp_servers = {m.server_label: m for m in self.mcp_servers_list}

    def link_function(self, name: str, function: Callable):
        if name in self.functions:
            self.functions[name].link_function(function)

    def register_mcp_server(self, server: MCP):
        self.mcp_servers[server.server_label] = server

    def __call__(self, **kwargs):
        if not kwargs:
            return self.prompt
        return self.prompt.format(**kwargs)

    @property
    def exception_handler(self):
        # Recursive prompt creation logic (simplified for now)
        return Prompt(
            path=f'__{self.path}_exception_handler',
            prompt=self.exception_prompt,
            parser=self.parser,
            functions_list=self.functions_list,
            mcp_servers_list=self.mcp_servers_list,
            exception_prompt=self.exception_prompt,
            interrupt_prompt=self.interrupt_prompt,
            format=self.format,
            xml_tags=self.xml_tags,
            md_tags=self.md_tags,
            signal_tags=self.signal_tags,
            required_xml_tags=self.required_xml_tags,
            required_md_tags=self.required_md_tags,
            allow_web_search=self.allow_web_search,
            computer_use_config=self.computer_use_config,
            interrupt_final_prompt=self.interrupt_final_prompt,
        )

    @property
    def interrupt_handler(self):
         return Prompt(
            path=f'__{self.path}_interrupt_handler',
            prompt=self.interrupt_prompt,
            parser=self.parser,
            functions_list=self.functions_list,
            mcp_servers_list=self.mcp_servers_list,
            exception_prompt=self.exception_prompt,
            interrupt_prompt=self.interrupt_prompt,
            format=self.format,
            xml_tags=self.xml_tags,
            md_tags=self.md_tags,
            signal_tags=self.signal_tags,
            required_xml_tags=self.required_xml_tags,
            required_md_tags=self.required_md_tags,
            allow_web_search=self.allow_web_search,
            computer_use_config=self.computer_use_config,
            interrupt_final_prompt=self.interrupt_final_prompt,
        )

    @property
    def interrupt_handler_final(self):
         return Prompt(
            path=f'__{self.path}_interrupt_handler_final',
            prompt=self.interrupt_final_prompt,
            parser=self.parser,
            functions_list=self.functions_list,
            mcp_servers_list=self.mcp_servers_list,
            exception_prompt=self.exception_prompt,
            interrupt_prompt=self.interrupt_prompt,
            interrupt_final_prompt=self.interrupt_final_prompt,
            format=self.format,
            xml_tags=self.xml_tags,
            md_tags=self.md_tags,
            signal_tags=self.signal_tags,
            required_xml_tags=self.required_xml_tags,
            required_md_tags=self.required_md_tags,
            allow_web_search=self.allow_web_search,
            computer_use_config=self.computer_use_config,
        )