Source code for lwe.backends.api.conversation_storage_manager

import threading

from lwe.core.logger import Logger

from lwe.core import constants
import lwe.core.util as util
from lwe.core.tool_cache import ToolCache
from lwe.core.token_manager import TokenManager

from lwe.backends.api.orm import Orm
from lwe.backends.api.conversation import ConversationManager
from lwe.backends.api.message import MessageManager
from lwe.backends.api.request import ApiRequest


[docs] class ConversationStorageManager: """Manage conversation storage.""" def __init__( self, config, tool_manager, current_user=None, conversation_id=None, provider=None, model_name=None, preset_name=None, provider_manager=None, orm=None, ): self.config = config self.log = Logger(self.__class__.__name__, self.config) self.tool_manager = tool_manager self.current_user = current_user self.conversation_id = conversation_id self.provider = provider self.model_name = model_name or constants.API_BACKEND_DEFAULT_MODEL self.preset_name = preset_name or "" self.provider_manager = provider_manager self.tool_cache = ToolCache(self.config, self.tool_manager) self.token_manager = TokenManager( self.config, self.provider, self.model_name, self.tool_cache ) self.orm = orm or Orm(self.config) self.conversation = ConversationManager(config, self.orm) self.message = MessageManager(config, self.orm)
[docs] def store_conversation_messages(self, new_messages, response_content=None, title=None): """ Store conversation messages. :param new_messages: New messages :type new_messages: list :param response_content: Response content :type response_content: str :param title: Title :type title: str, optional :returns: success, conversation or response_content, message :rtype: tuple """ self.log.debug( f"Storing conversation messages for conversation: {self.conversation_id or 'new'}" ) if self.current_user: success, response, user_message = self.add_new_messages_to_conversation( new_messages, title ) if not success: return success, response, user_message conversation, last_message = response if conversation.title: self.log.debug( f"Conversation {conversation.id} already has title: {conversation.title}" ) else: self.gen_title(conversation) return True, conversation, "Conversation updated with new messages" else: return True, response_content, "No current user, conversation not saved"
[docs] def create_new_conversation_if_needed(self, title=None): """ Create new conversation if it doesn't exist. :param title: Conversation title, defaults to None :type title: str, optional :returns: Conversation object :rtype: Conversation """ if self.conversation_id: success, conversation, message = self.conversation.get_conversation( self.conversation_id ) if not success: raise Exception(message) else: success, conversation, message = self.conversation.add_conversation( self.current_user.id, title=title ) self.conversation_id = conversation.id if not success: raise Exception(message) return conversation
[docs] def add_new_messages_to_conversation(self, new_messages, title=None): """Add new messages to a conversation. :param new_messages: New messages :type new_messages: list :param title: Conversation title, defaults to None :type title: str, optional :returns: Conversation, last message :rtype: tuple """ conversation = self.create_new_conversation_if_needed(title) last_message = None for m in new_messages: success, last_message, user_message = self.add_message( m["role"], m["message"], m["message_type"], m["message_metadata"] ) if not success: raise Exception(user_message) return ( True, (conversation, last_message), f"Added new messages to conversation {conversation.id}", )
[docs] def add_message(self, role, message, message_type, metadata): """ Add a new message to a conversation. :param role: Message role :type role: str :param message: Message content :type message: str :param message_type: Message type :type message_type: str :param metadata: Message metadata :type metadata: dict :returns: success, added message, user message :rtype: tuple """ return self.message.add_message( self.conversation_id, role, message, message_type, metadata, self.provider.name, self.model_name, self.preset_name, )
[docs] def get_title_provider_llm(self): """ Get the title provider and LLM. :returns: provider, llm :rtype: tuple """ title_provider_name = self.config.get("backend_options.title_generation.provider") if title_provider_name: provider = self.provider_manager.get_provider_from_name(title_provider_name) if not provider: raise RuntimeError(f"Failed to load title provider: {title_provider_name}") customizations = { "temperature": 0, } title_provider_model = self.config.get("backend_options.title_generation.model") if title_provider_model: customizations[provider.model_property_name] = title_provider_model llm = provider.make_llm(customizations=customizations) else: provider = self.provider_manager.get_provider_from_name("chat_openai") llm = provider.make_llm( customizations={ "model_name": constants.API_BACKEND_DEFAULT_MODEL, "temperature": 0, }, use_defaults=True, ) return provider, llm
[docs] def gen_title_thread(self, conversation_id): """ Generate the title for a conversation in a separate thread. :param conversation_id: Conversation ID :type conversation_id: int """ self.log.info(f"Generating title for conversation {conversation_id}") # NOTE: This might need to be smarter in the future, but for now # it should be reasonable to assume that the second record is the # first user message we need for generating the title. message_manager = MessageManager(self.config, self.orm) conversation_manager = ConversationManager(self.config, self.orm) success, messages, user_message = message_manager.get_messages(conversation_id, limit=2) if not success: self.log.warning(f"Failed to generate title for conversation: {user_message}") return user_content = messages[1]["message"][: constants.TITLE_GENERATION_MAX_CHARACTERS] new_messages = [ self.message.build_message("system", constants.DEFAULT_TITLE_GENERATION_SYSTEM_PROMPT), self.message.build_message( "user", "%s: %s" % (constants.DEFAULT_TITLE_GENERATION_USER_PROMPT, user_content), ), ] provider, llm = self.get_title_provider_llm() new_messages = util.transform_messages_to_chat_messages(new_messages) new_messages = [provider.convert_dict_to_message(m) for m in new_messages] try: self.log.debug( f"Title generation LLM provider: {provider.name}, model: {getattr(llm, provider.model_property_name, 'N/A')}" ) result = llm.invoke(new_messages) provider_non_streaming_method = getattr(provider, "handle_non_streaming_response", None) if provider_non_streaming_method: result = provider_non_streaming_method(result) request = ApiRequest(orm=self.orm, config=self.config) message, _tool_calls = request.extract_message_content(result) title = message["message"].replace("\n", ", ").strip().strip("'\"") self.log.info(f"Title generated for conversation {conversation_id}: {title}") success, conversation, user_message = conversation_manager.edit_conversation_title( conversation_id, title ) if success: self.log.debug(f"Title saved for conversation {conversation_id}") except ValueError as e: self.log.warning(f"Failed to generate title for conversation: {str(e)}")
[docs] def gen_title(self, conversation): """ Generate the title for a conversation. :param conversation: Conversation :type conversation: Conversation """ conversation_id = conversation.id database = self.config.get("database") if database.startswith("sqlite") and ":memory:" in database: # Special case for in memory SQLite, as it cannot access the # in memory database from another thread. self.gen_title_thread(conversation_id) else: thread = threading.Thread(target=self.gen_title_thread, args=(conversation_id,)) thread.start()
[docs] def get_conversation_token_count(self): """Get token count for conversation. :returns: Number of tokens :rtype: int """ success, old_messages, user_message = self.message.get_messages(self.conversation_id) if not success: raise Exception(user_message) tokens = self.token_manager.get_num_tokens_from_messages(old_messages) return tokens