Custom storage
The Multi-Agent Orchestrator System provides flexibility in how conversation data is stored through its abstract ChatStorage
class. This guide will walk you through the process of creating a custom storage solution by extending this class.
Understanding the ChatStorage Abstract Class
The ChatStorage
class defines the interface for all storage solutions in the system. It includes three main methods and two helper methods:
import { ConversationMessage } from "../types";
export abstract class ChatStorage { protected isConsecutiveMessage(conversation: ConversationMessage[], newMessage: ConversationMessage): boolean { if (conversation.length === 0) return false; const lastMessage = conversation[conversation.length - 1]; return lastMessage.role === newMessage.role; }
protected trimConversation(conversation: ConversationMessage[], maxHistorySize?: number): ConversationMessage[] { if (maxHistorySize === undefined) return conversation; // Ensure maxHistorySize is even to maintain complete binoms const adjustedMaxHistorySize = maxHistorySize % 2 === 0 ? maxHistorySize : maxHistorySize - 1; return conversation.slice(-adjustedMaxHistorySize); }
abstract saveChatMessage( userId: string, sessionId: string, agentId: string, newMessage: ConversationMessage, maxHistorySize?: number ): Promise<ConversationMessage[]>;
abstract fetchChat( userId: string, sessionId: string, agentId: string, maxHistorySize?: number ): Promise<ConversationMessage[]>;
abstract fetchAllChats( userId: string, sessionId: string ): Promise<ConversationMessage[]>;}
from abc import ABC, abstractmethodfrom typing import List, Optionalfrom multi_agent_orchestrator.types import ConversationMessage
class ChatStorage(ABC): def is_consecutive_message(self, conversation: List[ConversationMessage], new_message: ConversationMessage) -> bool: if not conversation: return False last_message = conversation[-1] return last_message.role == new_message.role
def trim_conversation(self, conversation: List[ConversationMessage], max_history_size: Optional[int] = None) -> List[ConversationMessage]: if max_history_size is None: return conversation # Ensure max_history_size is even to maintain complete binoms adjusted_max_history_size = max_history_size if max_history_size % 2 == 0 else max_history_size - 1 return conversation[-adjusted_max_history_size:]
@abstractmethod async def save_chat_message( self, user_id: str, session_id: str, agent_id: str, new_message: ConversationMessage, max_history_size: Optional[int] = None ) -> List[ConversationMessage]: pass
@abstractmethod async def fetch_chat( self, user_id: str, session_id: str, agent_id: str, max_history_size: Optional[int] = None ) -> List[ConversationMessage]: pass
@abstractmethod async def fetch_all_chats( self, user_id: str, session_id: str ) -> List[ConversationMessage]: pass
The ChatStorage
class now includes two helper methods:
isConsecutiveMessage
(TypeScript) /is_consecutive_message
(Python): Checks if a new message is consecutive to the last message in the conversation.trimConversation
(TypeScript) /trim_conversation
(Python): Trims the conversation history to the specified maximum size, ensuring an even number of messages.
The three main abstract methods are:
saveChatMessage
(TypeScript) /save_chat_message
(Python): Saves a new message to the storage.fetchChat
(TypeScript) /fetch_chat
(Python): Retrieves messages for a specific conversation.fetchAllChats
(TypeScript) /fetch_all_chats
(Python): Retrieves all messages for a user’s session.
Creating a Custom Storage Solution
To create a custom storage solution, follow these steps:
- Create a new class that extends
ChatStorage
. - Implement all the abstract methods.
- Utilize the helper methods
isConsecutiveMessage
andtrimConversation
in your implementation. - Add any additional methods or properties specific to your storage solution.
Important When implementing
fetchAllChats
, concatenate the agent ID with the message text in the response when the role is ASSISTANT:
ASSISTANT: [agent-a] Response from agent AUSER: Some user inputASSISTANT: [agent-b] Response from agent B
Here’s an example of a simple custom storage solution using an in-memory dictionary:
import { ChatStorage, ConversationMessage } from 'multi-agent-orchestrator';
class SimpleInMemoryStorage extends ChatStorage {private storage: { [key: string]: ConversationMessage[] } = {};
async saveChatMessage( userId: string, sessionId: string, agentId: string, newMessage: ConversationMessage, maxHistorySize?: number): Promise<ConversationMessage[]> { const key = `${userId}:${sessionId}:${agentId}`; if (!this.storage[key]) { this.storage[key] = []; }
if (!this.isConsecutiveMessage(this.storage[key], newMessage)) { this.storage[key].push(newMessage); }
this.storage[key] = this.trimConversation(this.storage[key], maxHistorySize); return this.storage[key];}
async fetchChat( userId: string, sessionId: string, agentId: string, maxHistorySize?: number): Promise<ConversationMessage[]> { const key = `${userId}:${sessionId}:${agentId}`; const conversation = this.storage[key] || []; return this.trimConversation(conversation, maxHistorySize);}
async fetchAllChats( userId: string, sessionId: string): Promise<ConversationMessage[]> { const allMessages: ConversationMessage[] = []; for (const key in this.storage) { if (key.startsWith(`${userId}:${sessionId}`)) { const agentId = key.split(':')[2]; for (const message of this.storage[key]) { const newContent = message.content ? [...message.content] : []; if (newContent.length > 0 && message.role === ParticipantRole.ASSISTANT) { newContent[0] = { text: `[${agentId}] ${newContent[0].text}` }; } allMessages.push({ ...message, content: newContent }); } } } return allMessages;}}
from typing import List, Optional, Dictfrom multi_agent_orchestrator.storage import ChatStoragefrom multi_agent_orchestrator.types import ConversationMessage
class SimpleInMemoryStorage(ChatStorage):def __init__(self): self.storage: Dict[str, List[ConversationMessage]] = {}
async def save_chat_message( self, user_id: str, session_id: str, agent_id: str, new_message: ConversationMessage, max_history_size: Optional[int] = None) -> List[ConversationMessage]: key = f"{user_id}:{session_id}:{agent_id}" if key not in self.storage: self.storage[key] = []
if not self.is_consecutive_message(self.storage[key], new_message): self.storage[key].append(new_message)
self.storage[key] = self.trim_conversation(self.storage[key], max_history_size) return self.storage[key]
async def fetch_chat( self, user_id: str, session_id: str, agent_id: str, max_history_size: Optional[int] = None) -> List[ConversationMessage]: key = f"{user_id}:{session_id}:{agent_id}" conversation = self.storage.get(key, []) return self.trim_conversation(conversation, max_history_size)
async def fetch_all_chats( self, user_id: str, session_id: str) -> List[ConversationMessage]: all_messages = [] prefix = f"{user_id}:{session_id}" for key, messages in self.storage.items(): if key.startswith(prefix): agent_id = key.split(':')[2] for message in messages: new_content = message.content if message.content else [] if len(new_content) > 0 and message.role == ParticipantRole.ASSISTANT: new_content[0] = {'text': f"[{agent_id}] {new_content[0]['text']}"} all_messages.append( ConversationMessage( role=message.role, content=new_content ) ) return sorted(all_messages, key=lambda m: getattr(m, 'timestamp', 0))
Using Your Custom Storage
To use your custom storage with the Multi-Agent Orchestrator:
const customStorage = new SimpleInMemoryStorage();const orchestrator = new MultiAgentOrchestrator({ storage: customStorage});
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestratorfrom your_custom_storage_module import SimpleInMemoryStorage
custom_storage = SimpleInMemoryStorage()orchestrator = MultiAgentOrchestrator(storage=custom_storage)
By extending the ChatStorage
class, you can create custom storage solutions tailored to your specific needs, whether it’s integrating with a particular database system, implementing caching mechanisms, or adapting to unique architectural requirements.
Remember to consider factors such as scalability, persistence, and error handling when implementing your custom storage solution for production use. The helper methods isConsecutiveMessage
and trimConversation
can be particularly useful for managing conversation history effectively.