Source code for lwe.backends.api.backend

import copy
import os

from lwe.core.config import Config
from lwe.core.logger import Logger
from lwe.backends.api.database import Database
from lwe.backends.api.orm import Orm, User
from lwe.core.cache_manager import CacheManager
from lwe.core.template_manager import TemplateManager
from lwe.core.preset_manager import PresetManager
from lwe.core.provider_manager import ProviderManager
from lwe.core.workflow_manager import WorkflowManager
from lwe.core.tool_manager import ToolManager
from lwe.core.plugin_manager import PluginManager
import lwe.core.constants as constants
import lwe.core.util as util
from lwe.backends.api.request import ApiRequest
from lwe.backends.api.conversation_storage_manager import ConversationStorageManager
from lwe.backends.api.user import UserManager
from lwe.backends.api.conversation import ConversationManager
from lwe.backends.api.message import MessageManager
from lwe.backends.api.orm import Conversation
from lwe.core.preset_manager import parse_llm_dict

ADDITIONAL_PLUGINS = [
    "provider_chat_openai",
]


[docs] class ApiBackend: """Backend implementation using direct API access.""" name = "api" def __init__(self, config=None, orm=None): """ Initializes the Backend instance. This method sets up attributes that should only be initialized once. :param config: Optional configuration for the backend. If not provided, it uses a default configuration. """ self.config = config or Config() self.conversation_id = None self.conversation_title = None self.current_user = None self.request = None self.logfile = None self.orm = orm or Orm(config) self.user_manager = UserManager(config, self.orm) self.conversation = ConversationManager(config, self.orm) self.message = MessageManager(config, self.orm) self.initialize_database() self.initialize_backend(self.config) self.initialize_file_logging()
[docs] def set_available_models(self): """ Sets the available models for the provider. """ self.available_models = self.provider.available_models
[docs] def make_llm(self, customizations=None): """ Creates a Language Model (llm) using the provider. :param customizations: Optional dictionary for customizations. :return: Language Model (llm) object. """ customizations = customizations or {} llm = self.provider.make_llm(customizations) return llm
[docs] def terminate_stream(self, _signal, _frame): """ Handles termination signal, passing it to the request if present. :param _signal: The signal that triggered the termination. :param _frame: Current stack frame. """ self.log.info("Received signal to terminate stream") self.request and self.request.terminate_stream(_signal, _frame)
[docs] def run_template_setup(self, template_name, substitutions=None): """ Sets up the run of a template. :param template_name: Name of the template to run. :param substitutions: Optional dictionary of substitutions. :return: A tuple containing a success indicator, tuple of template setup data, and a user message. """ self.log.info(f"Setting up run of template: {template_name}") substitutions = substitutions or {} message, overrides = self.template_manager.build_message_from_template( template_name, substitutions ) return True, (message, overrides), f"Set up of template run complete: {template_name}"
[docs] def run_template_compiled(self, message, overrides=None): """ Runs the compiled template. :param message: The message to be sent. :param overrides: Optional dictionary of overrides. :return: The response tuple from LLM request. """ overrides = overrides or {} self.log.info("Running template") response = self.make_request(message, **overrides) return response
[docs] def build_message_from_template(self, template_name, template_vars=None, overrides=None): """ Builds the message from the template. :param template_name: Name of the template to run. :param template_vars: Optional dictionary of template variables. :param overrides: Optional dictionary of overrides. :return: A tuple containing a success indicator, tuple of built message and overrides, and a user message. """ template_vars = template_vars or {} overrides = overrides or {} ( success, response, user_message, ) = self.template_manager.get_template_variables_substitutions(template_name) if not success: return success, response, user_message _template, _variables, substitutions = response util.merge_dicts(substitutions, template_vars) success, response, user_message = self.run_template_setup(template_name, substitutions) if not success: return success, response, user_message message, template_overrides = response util.merge_dicts(template_overrides, overrides) return True, (message, template_overrides), f"Built message from template: {template_name}"
[docs] def run_template(self, template_name, template_vars=None, overrides=None): """ Runs the given template with the provided variables and overrides. :param template_name: Name of the template to run. :param template_vars: Optional dictionary of template variables, will merged with any set in the template. :param overrides: Optional dictionary of overrides, will be merged with any set in the template. :return: The response tuple from the template run. """ success, response, user_message = self.build_message_from_template( template_name, template_vars=template_vars, overrides=overrides ) if not success: return success, response, user_message message, overrides = response response = self.run_template_compiled(message, overrides) return response
[docs] def initialize_backend(self, config=None): """ Initializes the backend with provided or default configuration, and sets up necessary attributes. This method is safe to call for dynamically reloading backends. :param config: Backend configuration options :type config: dict, optional """ self.config = config or Config() self.log = Logger(self.__class__.__name__, self.config) self.provider_name = None self.provider = None self.message_clipboard = None self.return_only = False self.cache_manager = CacheManager(self.config) self.template_manager = TemplateManager(self.config) self.preset_manager = PresetManager(self.config) self.plugin_manager = PluginManager( self.config, self, self.cache_manager, additional_plugins=ADDITIONAL_PLUGINS ) self.provider_manager = ProviderManager(self.config, self.plugin_manager) self.workflow_manager = WorkflowManager(self.config) self.tool_manager = ToolManager(self.config) self.workflow_manager.load_workflows() self.init_provider() self.set_available_models() self.set_conversation_tokens(0) self.auto_create_first_user() self.load_default_user() self.load_default_conversation()
[docs] def initialize_database(self): database = Database(self.config, orm=self.orm) database.create_schema()
[docs] def auto_create_first_user(self): username = self.config.get("backend_options.auto_create_first_user") if isinstance(username, str): query = self.user_manager.session.query(User).order_by(User.id).limit(1) first_user = query.first() if not first_user: first_user = self.user_manager.orm_add_user(username, None, None) return first_user
[docs] def load_default_user(self): default_user = self.config.get("backend_options.default_user") if default_user is not None: self.load_user(default_user)
[docs] def load_default_conversation(self): default_conversation_id = self.config.get("backend_options.default_conversation_id") if default_conversation_id is not None: self.load_conversation(default_conversation_id)
[docs] def load_user(self, identifier): """Load a user by id or username/email. :param identifier: User id or username/email :type identifier: int, str :raises Exception: If user not found """ if isinstance(identifier, int): success, user, user_message = self.user_manager.get_by_user_id(identifier) else: success, user, user_message = self.user_manager.get_by_username_or_email(identifier) if not success or not user: raise Exception(user_message) self.set_current_user(user)
[docs] def load_conversation(self, conversation_id): """ Load a conversation by id. :param conversation_id: Conversation id :type conversation_id: int """ success, conversation_data, user_message = self.get_conversation(conversation_id) if success: if conversation_data: self.switch_to_conversation(conversation_id) return else: user_message = "Missing conversation data" raise Exception(user_message)
[docs] def init_system_message(self): """Initialize the system message from config.""" success, _alias, user_message = self.set_system_message( self.config.get("model.default_system_message") ) if not success: util.print_status_message(success, user_message) self.set_system_message()
[docs] def get_providers(self): """Get available provider plugins.""" return self.provider_manager.get_provider_plugins()
[docs] def init_provider(self): """Initialize the default provider and model.""" self.init_system_message() self.active_preset = None self.active_preset_name = None default_preset = self.config.get("model.default_preset") if default_preset: success, preset, user_message = self.activate_preset(default_preset) if success: return util.print_status_message( False, f"Failed to load default preset {default_preset}: {user_message}" ) self.set_provider("provider_chat_openai")
[docs] def set_provider(self, provider_name, customizations=None, reset=False): """ Set the active provider plugin. :param provider_name: Name of provider plugin :type provider_name: str :param customizations: Customizations for provider, defaults to None :type customizations: dict, optional :param reset: Whether to reset provider, defaults to False :type reset: bool, optional :returns: success, provider, message :rtype: tuple """ self.log.debug( f"Setting provider to: {provider_name}, with customizations: {customizations}, reset: {reset}" ) previous_state = { "active_preset": self.active_preset, "active_preset_name": self.active_preset_name, "provider_name": self.provider_name, "provider": self.provider, "llm": getattr(self, "llm", None), "model": getattr(self, "model", None), "max_submission_tokens": getattr(self, "max_submission_tokens", None), } self.active_preset = None self.active_preset_name = None provider_full_name = self.provider_manager.full_name(provider_name) if self.provider_name == provider_full_name and not reset: return False, None, f"Provider {provider_name} already set" success, provider, user_message = self.provider_manager.load_provider(provider_full_name) if success: provider.setup() self.provider_name = provider_full_name self.provider = provider if isinstance(customizations, dict): for key, value in customizations.items(): ( success, customizations, customization_message, ) = self.provider.set_customization_value(key, value) if not success: self.restore_provider_state(previous_state) return success, customizations, customization_message self.llm = self.make_llm() model_name = getattr(self.llm, self.provider.model_property_name) success, customizations, user_message = self.set_model(model_name) if not success: self.restore_provider_state(previous_state) return success, customizations, user_message return success, provider, user_message
[docs] def restore_provider_state(self, previous_state): """Restore backend provider attributes after a failed provider change.""" self.active_preset = previous_state["active_preset"] self.active_preset_name = previous_state["active_preset_name"] self.provider_name = previous_state["provider_name"] self.provider = previous_state["provider"] self.llm = previous_state["llm"] self.model = previous_state["model"] if previous_state["max_submission_tokens"] is None: if hasattr(self, "max_submission_tokens"): del self.max_submission_tokens else: self.max_submission_tokens = previous_state["max_submission_tokens"]
# TODO: This feels hacky, perhaps better to have a shell register itself # for output from the backend?
[docs] def set_return_only(self, return_only=False): self.return_only = return_only
[docs] def set_model(self, model_name): """ Set the active model. :param model_name: Name of model :type model_name: str :returns: success, customizations, message :rtype: tuple """ self.log.debug(f"Setting model to: {model_name}") success, customizations, user_message = self.provider.set_model(model_name) if success: self.model = model_name self.set_max_submission_tokens() return success, customizations, user_message
[docs] def make_preset(self): """Make preset from current provider customizations.""" metadata, customizations = parse_llm_dict(self.provider.customizations) return metadata, customizations
[docs] def activate_preset(self, preset_name): """ Activate a preset. :param preset_name: Name of preset :type preset_name: str :returns: success, preset, message :rtype: tuple """ self.log.debug(f"Activating preset: {preset_name}") success, preset, user_message = self.preset_manager.ensure_preset(preset_name) if not success: return success, preset, user_message metadata, customizations = preset customizations = copy.deepcopy(customizations) success, provider, user_message = self.set_provider( metadata["provider"], customizations, reset=True ) if success: self.active_preset = preset self.active_preset_name = preset_name if "system_message" in metadata: self.set_system_message(metadata["system_message"]) if "max_submission_tokens" in metadata: self.set_max_submission_tokens(metadata["max_submission_tokens"]) return success, preset, user_message
[docs] def reload_plugin(self, plugin_name): """ Reload a plugin. :param plugin_name: Name of plugin :type plugin_name: str :returns: success, plugin_instance, message :rtype: tuple """ return self.plugin_manager.reload_plugin(plugin_name)
def _handle_response(self, success, obj, message): """ Handle response tuple. Logs errors if not successful. :param success: If request was successful :type success: bool :param obj: Returned object :param message: Message :type message: str :returns: success, obj, message :rtype: tuple """ if not success: self.log.error(message) return success, obj, message
[docs] def set_conversation_tokens(self, tokens): """ Set current conversation token count. :param tokens: Number of conversation tokens :type tokens: int """ if self.conversation_id is None: provider = self.provider else: success, last_message, user_message = self.message.get_last_message( self.conversation_id ) if not success: raise ValueError(user_message) provider = self.provider_manager.get_provider_from_name(last_message["provider"]) if provider is not None and provider.get_capability("chat"): self.conversation_tokens = tokens else: self.conversation_tokens = None
[docs] def switch_to_conversation(self, conversation_id): """ Switch to a conversation. :param conversation_id: Conversation id :type conversation_id: int """ self.log.debug(f"Switching to conversation {conversation_id}") success, conversation, user_message = self.get_conversation(conversation_id) if success: self.conversation_id = conversation_id self.conversation_title = conversation["conversation"]["title"] else: raise ValueError(user_message) success, last_message, user_message = self.message.get_last_message(self.conversation_id) if not success: raise ValueError(user_message) model_configured = False self.log.debug(f"Retrieved last message {last_message}") if last_message["preset"]: self.log.debug(f"Last message has preset: {last_message['preset']}") success, _preset, user_message = self.activate_preset(last_message["preset"]) if success: model_configured = True else: util.print_status_message( False, f"Unable to switch conversation to previous preset {last_message['preset']!r} -- ERROR: {user_message}, falling back to provider: {last_message['provider']}, model: {last_message['model']}", ) if not model_configured: if last_message["provider"] and last_message["model"]: self.log.debug( f"Last message has provider: {last_message['provider']}, model: {last_message['model']}" ) success, _provider, _user_message = self.set_provider( last_message["provider"], reset=True ) if success: success, _customizations, _user_message = self.set_model(last_message["model"]) if success: self.init_system_message() model_configured = True if not model_configured: message = "Invalid conversation provider/model, falling back to default provider/model" self.log.warning(message) util.print_status_message(False, message) self.init_provider() conversation_storage_manager = ConversationStorageManager( self.config, self.tool_manager, self.current_user, self.conversation_id, self.provider, self.model, self.active_preset_name or "", provider_manager=self.provider_manager, orm=self.orm, ) tokens = conversation_storage_manager.get_conversation_token_count() self.set_conversation_tokens(tokens) self.write_log_context()
[docs] def get_system_message(self, system_message="default"): """ Get the system message. :param system_message: System message alias :type system_message: str :returns: System message :rtype: str """ aliases = self.get_system_message_aliases() if system_message in aliases: system_message = aliases[system_message] return system_message
[docs] def set_system_message(self, system_message="default"): """ Set the system message. :param system_message: System message or alias :type system_message: str """ self.system_message = self.get_system_message(system_message) self.system_message_alias = ( system_message if system_message in self.get_system_message_aliases() else None ) message = f"System message set to: {self.system_message}" self.log.info(message) return True, system_message, message
[docs] def set_max_submission_tokens(self, max_submission_tokens=None): """ Set the max submission tokens. :param max_submission_tokens: Max submission tokens :type max_submission_tokens: int :param force: Force setting max submission tokens :type force: bool """ self.max_submission_tokens = max_submission_tokens or self.provider.max_submission_tokens() return ( True, self.max_submission_tokens, f"Max submission tokens set to {self.max_submission_tokens}", )
[docs] def get_runtime_config(self): """ Get the runtime configuration. :returns: Runtime configuration :rtype: str """ output = """ * Max submission tokens: %s * System message: %s """ % ( self.max_submission_tokens, self.system_message, ) return output
[docs] def get_system_message_aliases(self): """ Get system message aliases from config. :returns: Dict of message aliases :rtype: dict """ aliases = self.config.get("model.system_message") aliases["default"] = constants.SYSTEM_MESSAGE_DEFAULT return aliases
[docs] def retrieve_old_messages(self, conversation_id=None, target_id=None): """ Retrieve old messages for a conversation. :param conversation_id: Conversation id, defaults to current :type conversation_id: int, optional :param target_id: Target message id, defaults to None :type target_id: int, optional :returns: List of messages :rtype: list """ old_messages = [] if conversation_id: success, old_messages, message = self.message.get_messages( conversation_id, target_id=target_id ) if not success: raise Exception(message) return old_messages
[docs] def set_current_user(self, user=None): """ Set the current user. :param user: User object, defaults to None :type user: User, optional :returns: success, preset, message on preset activation, otherwise init the provider :rtype: tuple """ self.log.debug(f"Setting current user to {user.username if user else None}") self.current_user = user if self.current_user: if self.current_user.default_preset: self.log.debug( f"Activating user default preset: {self.current_user.default_preset}" ) return self.activate_preset(self.current_user.default_preset) return self.init_provider()
[docs] def conversation_data_to_messages(self, conversation_data): """ Convert conversation data to list of messages. :param conversation_data: Conversation data dict :type conversation_data: dict :returns: List of messages :rtype: list """ return conversation_data["messages"]
[docs] def delete_conversation(self, conversation_id=None): """Delete a conversation. :param conversation_id: Conversation id, defaults to current :type conversation_id: int, optional :returns: success, conversation, message :rtype: tuple """ conversation_id = conversation_id if conversation_id else self.conversation_id success, conversation, message = self.conversation.delete_conversation(conversation_id) return self._handle_response(success, conversation, message)
[docs] def set_title(self, title, conversation_id=None): """ Set conversation title. :param title: New title :type title: str :param conversation_id: Conversation id, defaults to current :type conversation_id: int, optional :returns: success, conversation, message :rtype: tuple """ conversation_id = conversation_id if conversation_id else self.conversation_id success, conversation, user_message = self.conversation.edit_conversation_title( conversation_id, title ) if success: self.conversation_title = conversation.title return self._handle_response(success, conversation, user_message)
[docs] def get_history(self, limit=20, offset=0, user_id=None): """ Get conversation history. :param limit: Number of results, defaults to 20 :type limit: int, optional :param offset: Result offset, defaults to 0 :type offset: int, optional :param user_id: User id, defaults to current :type user_id: int, optional :returns: success, history dict, message :rtype: tuple """ user_id = user_id if user_id else self.current_user.id success, conversations, message = self.conversation.get_conversations( user_id, limit=limit, offset=offset ) if success: history = {m.id: self.orm.object_as_dict(m) for m in conversations} return success, history, message return self._handle_response(success, conversations, message)
[docs] def get_conversation(self, id=None): """ Get a conversation. :param id: Conversation id, defaults to current :type id: int, optional :returns: success, conversation dict, message :rtype: tuple """ id = id if id else self.conversation_id if not id: return False, None, "No current conversation" success, conversation, message = self.conversation.get_conversation(id) if success: success, messages, message = self.message.get_messages(id) if success: conversation_data = { "conversation": self.orm.object_as_dict(conversation), "messages": messages, } return success, conversation_data, message return self._handle_response(success, conversation, message)
[docs] def get_current_conversation_title(self): if not self.conversation_id: return None if self.conversation_title: return self.conversation_title success, conversation, message = self.conversation.get_conversation(self.conversation_id) return success and conversation.title or None
[docs] def new_conversation(self): """Start a new conversation.""" self.conversation_id = None self.conversation_title = None self.message_clipboard = None self.set_conversation_tokens(0) self.write_log_context()
[docs] def write_log(self, prompt, response): """Write prompt and response to log file if logging is enabled.""" if self.logfile is not None: contents = f""" USER: {prompt} ASSISTANT: {response} """ try: self.logfile.write(contents) except OSError as e: message = f"Failed to write content to log file '{self.logfile.name}': {e}" self.log.error(message) self.write_log_context()
[docs] def write_log_context(self): """Write current conversation context to log file if logging is enabled.""" if self.logfile is not None: try: self.logfile.write(f"## context {self.conversation_id}\n") self.logfile.flush() except OSError as e: message = f"Failed to write log context to log file '{self.logfile.name}': {e}" self.log.error(message)
[docs] def initialize_file_logging(self): """Initialize file logging based on configuration.""" if self.config.get("chat.log.enabled"): log_file = self.config.get("chat.log.filepath") if log_file: self.open_log(log_file)
[docs] def open_log(self, filename): """Open a log file for writing.""" self.close_log() self.log.debug(f"Opening log file '{filename}'") try: if not os.path.isabs(filename): filename = os.path.join(os.getcwd(), filename) self.logfile = open(filename, "a", encoding="utf-8") self.log.debug(f"Opened log file '{self.logfile.name}'") return True except OSError as e: message = f"Failed to open log file '{filename}': {e}" self.log.error(message) return False
[docs] def close_log(self): """Close the current log file if one is open.""" if self.logfile is not None: self.log.debug("Closing log file") self.logfile.close() self.logfile = None
[docs] def make_request(self, input, request_overrides: dict = None): """ Ask the LLM a question, return and optionally stream a response. :param input: The input to be sent to the LLM, can be a string for a single user message, or a list of message dicts with 'role' and 'content' keys. :type input: str | list :request_overrides: Overrides for this specific request. :type request_overrides: dict, optional :returns: success, LLM response, message :rtype: tuple """ self.log.info("Starting 'ask' request") request_overrides = request_overrides or {} old_messages = self.retrieve_old_messages(self.conversation_id) self.log.debug( f"Extracting activate preset configuration from request_overrides: {request_overrides}" ) success, response, user_message = util.extract_preset_configuration_from_request_overrides( request_overrides, self.active_preset_name ) if not success: return success, response, user_message preset_name, _preset_overrides, activate_preset = response request = ApiRequest( self.config, self.provider, self.provider_manager, self.tool_manager, input, self.active_preset, self.preset_manager, self.system_message, old_messages, self.max_submission_tokens, request_overrides, orm=self.orm, ) self.request = request success, response, user_message = request.set_request_llm() if not success: self.request = None return self._handle_response(success, response, user_message) new_messages, messages = request.prepare_ask_request() success, response_obj, user_message = request.call_llm(messages) files = request_overrides.get("files", []) if files: self.log.debug("Files attached, returning directly") self.request = None response_content = response_obj and response_obj.content or response_obj return self._handle_response(success, response_content, user_message) if success: response_data = ( vars(response_obj) if hasattr(response_obj, "__dict__") else f"{response_obj}" ) self.log.debug(f"LLM Response: {response_data}") response_content, new_messages = request.post_response(response_obj, new_messages) self.message_clipboard = response_content title = request_overrides.get("title") conversation_storage_manager = ConversationStorageManager( self.config, self.tool_manager, self.current_user, self.conversation_id, request.provider, request.model_name, request.preset_name, provider_manager=self.provider_manager, orm=self.orm, ) ( success, response_obj, user_message, ) = conversation_storage_manager.store_conversation_messages( new_messages, response_content, title ) if success: if isinstance(response_obj, Conversation): conversation = response_obj self.conversation_id = conversation.id self.conversation_title = conversation.title tokens = conversation_storage_manager.get_conversation_token_count() self.set_conversation_tokens(tokens) response_obj = response_content if activate_preset: self.log.info(f"Activating preset from request override: {preset_name}") self.activate_preset(preset_name) self.write_log(input, response_obj) self.request = None return self._handle_response(success, response_obj, user_message)
[docs] def ask_stream(self, input: str, request_overrides: dict = None): """ Ask the LLM a question and stream a response. :param input: The input to be sent to the LLM. :type input: str :request_overrides: Overrides for this specific request. :type request_overrides: dict, optional :returns: success, LLM response, message :rtype: tuple """ request_overrides = request_overrides or {} request_overrides["stream"] = True return self.make_request(input, request_overrides)
[docs] def ask(self, input: str, request_overrides: dict = None): """ Ask the LLM a question and return response. :param input: The input to be sent to the LLM. :type input: str :request_overrides: Overrides for this specific request. :type request_overrides: dict, optional :returns: success, LLM response, message :rtype: tuple """ return self.make_request(input, request_overrides)