diff --git a/comps/chathistory/README.md b/comps/chathistory/README.md index 56a575dcd0..84d9adb431 100644 --- a/comps/chathistory/README.md +++ b/comps/chathistory/README.md @@ -6,8 +6,6 @@ It can be integrated into application by making HTTP requests to the provided AP ![Flow Chart](./assets/img/chathistory_flow.png) ---- - ## 🛠️ Features - **Store Chat Conversations**: Save chat messages user information, and metadata associated with each conversation. @@ -15,69 +13,11 @@ It can be integrated into application by making HTTP requests to the provided AP - **Update Chat Conversations**: Modify existing chat conversations by adding new messages or updating existing ones. - **Delete Chat Conversations**: Remove chat conversations record from database. ---- - -## 🤖 MCP (Model Context Protocol) Support - -The Chat History microservice supports MCP integration, allowing AI agents to discover and utilize chat history management capabilities as tools. - -### MCP Configuration - -#### Environment Variables - -- `ENABLE_MCP`: Set to `true`, `1`, or `yes` to enable MCP support (default: `false`) - -#### Docker Compose - -```yaml -services: - chathistory-mongo: - environment: - ENABLE_MCP: true -``` - -#### Kubernetes - -```yaml -chathistory: - ENABLE_MCP: true -``` - -### MCP Tools Available - -When MCP is enabled, the following tools are available for AI agents: - -1. **create_documents** - Create or update chat conversation history -2. **get_documents** - Retrieve chat conversation history -3. **delete_documents** - Delete chat conversation history - -### Usage with AI Agents - -```python -from comps.cores.mcp import OpeaMCPToolsManager - -# Initialize MCP tools manager -tools_manager = OpeaMCPToolsManager() - -# Add chathistory service -tools_manager.add_service("http://chathistory-service:6012") - -# AI agents can now discover and use chathistory tools -tools = await tools_manager.get_available_tools() -``` - -### MCP Endpoint - -When MCP is enabled, the service exposes an additional SSE endpoint: - -- `/sse` - Server-Sent Events endpoint for MCP communication - ---- - -## ⚙️ Implementation - -The Chat History microservice able to support various database backends for storing the chat conversations. +## ⚙️ Deployment Options -### Chat History with MongoDB +To get detailed, step-by-step instructions on deploying the `chathistory` microservice, you should consult the deployment guide. This guide will walk you through all the necessary steps, from building the Docker images to configuring your environment and running the service. -For more detail, please refer to this [README](src/README.md) +| Platform | Deployment Method | Database | Link | +| -------- | ----------------- | -------- | --------------------------------------------------------- | +| CPU | Docker | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) | +| CPU | Docker Compose | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) | diff --git a/comps/chathistory/src/README.md b/comps/chathistory/deployment/docker_compose/README.md similarity index 98% rename from comps/chathistory/src/README.md rename to comps/chathistory/deployment/docker_compose/README.md index 3cdf5bf270..7e08c08aff 100644 --- a/comps/chathistory/src/README.md +++ b/comps/chathistory/deployment/docker_compose/README.md @@ -9,6 +9,7 @@ This README provides setup guides and all the necessary information about the Ch ```bash export http_proxy=${your_http_proxy} export https_proxy=${your_http_proxy} +export OPEA_STORE_NAME="mongodb" export MONGO_HOST=${MONGO_HOST} export MONGO_PORT=27017 export DB_NAME=${DB_NAME} diff --git a/comps/chathistory/deployment/docker_compose/compose.yaml b/comps/chathistory/deployment/docker_compose/compose.yaml index 4751bc233f..a512ee6c54 100644 --- a/comps/chathistory/deployment/docker_compose/compose.yaml +++ b/comps/chathistory/deployment/docker_compose/compose.yaml @@ -24,6 +24,7 @@ services: http_proxy: ${http_proxy} no_proxy: ${no_proxy} https_proxy: ${https_proxy} + OPEA_STORE_NAME: ${OPEA_STORE_NAME:-mongodb} MONGO_HOST: ${MONGO_HOST} MONGO_PORT: ${MONGO_PORT} COLLECTION_NAME: ${COLLECTION_NAME} diff --git a/comps/chathistory/src/document_store.py b/comps/chathistory/src/document_store.py deleted file mode 100644 index ae2501cfea..0000000000 --- a/comps/chathistory/src/document_store.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - - -import bson.errors as BsonError -from bson.objectid import ObjectId - -from comps.chathistory.src.integrations.mongo.config import COLLECTION_NAME -from comps.chathistory.src.integrations.mongo.mongo_conn import MongoClient - - -class DocumentStore: - - def __init__( - self, - user: str, - ): - self.user = user - - def initialize_storage(self) -> None: - self.db_client = MongoClient.get_db_client() - self.collection = self.db_client[COLLECTION_NAME] - - async def save_document(self, document): - """Stores a new document into the storage. - - Args: - document: The document to be stored. - - Returns: - str: The ID of the inserted document. - - Raises: - Exception: If an error occurs while storing the document. - """ - try: - inserted_conv = await self.collection.insert_one( - document.model_dump(by_alias=True, mode="json", exclude={"id"}) - ) - document_id = str(inserted_conv.inserted_id) - return document_id - - except Exception as e: - print(e) - raise Exception(e) - - async def update_document(self, document_id, updated_data, first_query) -> str: - """Updates a document in the collection with the given document_id. - - Args: - document_id (str): The ID of the document to update. - updated_data (object): The updated data to be set in the document. - first_query (object): The first query to be set in the document. - - Returns: - bool: True if the document was successfully updated, False otherwise. - - Raises: - KeyError: If an invalid document_id is provided. - Exception: If an error occurs during the update process. - """ - try: - _id = ObjectId(document_id) - update_result = await self.collection.update_one( - {"_id": _id, "data.user": self.user}, - {"$set": {"data": updated_data.model_dump(by_alias=True, mode="json"), "first_query": first_query}}, - ) - if update_result.modified_count == 1: - return "Updated document : {}".format(document_id) - else: - raise Exception("Not able to Update the Document") - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - except Exception as e: - print(e) - raise Exception(e) - - async def get_all_documents_of_user(self) -> list[dict]: - """Retrieves all documents of a specific user from the collection. - - Returns: - A list of dictionaries representing the conversation documents. - Raises: - Exception: If there is an error while retrieving the documents. - """ - conversation_list: list = [] - try: - cursor = self.collection.find({"data.user": self.user}, {"data": 0}) - - async for document in cursor: - document["id"] = str(document["_id"]) - del document["_id"] - conversation_list.append(document) - return conversation_list - - except Exception as e: - print(e) - raise Exception(e) - - async def get_user_documents_by_id(self, document_id) -> dict | None: - """Retrieves a user document from the collection based on the given document ID. - - Args: - document_id (str): The ID of the document to retrieve. - - Returns: - dict | None: The user document if found, None otherwise. - """ - try: - _id = ObjectId(document_id) - response: dict | None = await self.collection.find_one({"_id": _id, "data.user": self.user}) - if response: - del response["_id"] - return response["data"] - return None - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) - - async def delete_document(self, document_id) -> str: - """Deletes a document from the collection based on the provided document ID. - - Args: - document_id (str): The ID of the document to be deleted. - - Returns: - bool: True if the document is successfully deleted, False otherwise. - - Raises: - KeyError: If the provided document ID is invalid. - Exception: If an error occurs during the deletion process. - """ - - try: - _id = ObjectId(document_id) - delete_result = await self.collection.delete_one({"_id": _id, "data.user": self.user}) - - delete_count = delete_result.deleted_count - print(f"Deleted {delete_count} documents!") - - if delete_count == 1: - return "Deleted document : {}".format(document_id) - else: - raise Exception("Not able to delete the Document") - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) diff --git a/comps/chathistory/src/integrations/data_store.py b/comps/chathistory/src/integrations/data_store.py new file mode 100644 index 0000000000..4acf505730 --- /dev/null +++ b/comps/chathistory/src/integrations/data_store.py @@ -0,0 +1,136 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from typing import Optional + +from fastapi import HTTPException +from pydantic import BaseModel + +from comps.cores.proto.api_protocol import ChatCompletionRequest +from comps.cores.storages.models import ChatId, ChatMessage +from comps.cores.storages.stores import column_to_id, get_store, id_to_column + + +class ChatMessageDto(BaseModel): + data: ChatCompletionRequest + first_query: Optional[str] = None + doc_id: Optional[str] = None + user: Optional[str] = None + + +def _prepersist(document: ChatMessage) -> dict: + """Converts a ChatMessage object to a dictionary suitable for persistence. + + Args: + document (ChatMessage): The ChatMessage object to be converted. + + Returns: + dict: A dictionary representation of the ChatMessage, ready for persistence. + """ + data_dict = document.model_dump(by_alias=True, mode="json") + data_dict = column_to_id("id", data_dict) + return data_dict + + +def _post_getby_id(rs: dict) -> dict: + """Post-processes a document retrieved by ID from the store. + + Args: + rs (dict): The raw document dictionary from the store. + + Returns: + dict: The processed document data, or None if the document doesn't exist. + """ + rs = id_to_column("id", rs) + return rs.get("data") if rs else None + + +def _post_getby_user(rss: list) -> list: + """Post-processes a list of documents retrieved by user from the store. + + Args: + rss (list): A list of raw document dictionaries from the store. + + Returns: + list: A list of processed documents with the 'data' field removed. + """ + for rs in rss: + rs = id_to_column("id", rs) + rs.pop("data") + return rss + + +def _check_user_info(document: ChatMessage | ChatId): + """Checks if the user information is provided in the document. + + Args: + document (ChatMessage|ChatId): The document to be checked. + Raises: + HTTPException: If the user information is missing. + """ + user = document.data.user if isinstance(document, ChatMessage) else document.user + if user is None: + raise HTTPException(status_code=400, detail="Please provide the user information") + + +async def save_or_update(document: ChatMessage): + """Saves a new chat message or updates an existing one in the data store. + + Args: + document (ChatMessage): The ChatMessage object to be saved or updated. + If the document has an ID, it will be updated; + otherwise, a new document will be created. + + Returns: + The result of the save or update operation from the store. + """ + _check_user_info(document) + store = get_store(document.data.user) + if document.id: + return await store.aupdate_document(_prepersist(document)) + else: + return await store.asave_document(_prepersist(document)) + + +async def get(document: ChatId): + """Retrieves chat messages from the data store. + + Args: + document (ChatId): The ChatId object containing user information and + optionally a document ID. If document.id is None, + retrieves all documents for the user; otherwise, + retrieves the specific document by ID. + + Returns: + Either a list of all documents for the user (if document.id is None) or + a specific document (if document.id is provided). + """ + _check_user_info(document) + store = get_store(document.user) + if document.id is None: + rss = await store.asearch(key="data.user", value=document.user) + return _post_getby_user(rss) + else: + rs = await store.aget_document_by_id(document.id) + return _post_getby_id(rs) + + +async def delete(document: ChatId): + """Deletes a specific chat message from the data store. + + Args: + document (ChatId): The ChatId object containing user information and document ID. + The document ID must be provided for deletion. + + Returns: + The result of the delete operation from the store. + + Raises: + Exception: If the document ID is not provided. + """ + _check_user_info(document) + store = get_store(document.user) + if document.id is None: + raise Exception("Document id is required.") + else: + return await store.adelete_document(document.id) diff --git a/comps/chathistory/src/integrations/mongo/__init__.py b/comps/chathistory/src/integrations/mongo/__init__.py deleted file mode 100644 index 4057dc0163..0000000000 --- a/comps/chathistory/src/integrations/mongo/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2025 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/chathistory/src/integrations/mongo/config.py b/comps/chathistory/src/integrations/mongo/config.py deleted file mode 100644 index b078941a80..0000000000 --- a/comps/chathistory/src/integrations/mongo/config.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import os - -# MONGO configuration -MONGO_HOST = os.getenv("MONGO_HOST", "localhost") -MONGO_PORT = os.getenv("MONGO_PORT", 27017) -DB_NAME = os.getenv("DB_NAME", "OPEA") -COLLECTION_NAME = os.getenv("COLLECTION_NAME", "ChatHistory") diff --git a/comps/chathistory/src/integrations/mongo/mongo_conn.py b/comps/chathistory/src/integrations/mongo/mongo_conn.py deleted file mode 100644 index 952aac95c9..0000000000 --- a/comps/chathistory/src/integrations/mongo/mongo_conn.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from typing import Any - -import motor.motor_asyncio as motor - -from comps.chathistory.src.integrations.mongo.config import DB_NAME, MONGO_HOST, MONGO_PORT - - -class MongoClient: - conn_url = f"mongodb://{MONGO_HOST}:{MONGO_PORT}/" - - @staticmethod - def get_db_client() -> Any: - try: - client = motor.AsyncIOMotorClient(MongoClient.conn_url) - db = client[DB_NAME] - return db - - except Exception as e: - print(e) - raise Exception(e) diff --git a/comps/chathistory/src/opea_chathistory_microservice.py b/comps/chathistory/src/opea_chathistory_microservice.py index 5ef98e6b67..ade4b9a7fa 100644 --- a/comps/chathistory/src/opea_chathistory_microservice.py +++ b/comps/chathistory/src/opea_chathistory_microservice.py @@ -1,33 +1,21 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 import os -from typing import Optional from fastapi import HTTPException -from pydantic import BaseModel from comps import CustomLogger -from comps.chathistory.src.document_store import DocumentStore +from comps.chathistory.src.integrations.data_store import delete, get, save_or_update from comps.cores.mega.constants import MCPFuncType from comps.cores.mega.micro_service import opea_microservices, register_microservice -from comps.cores.proto.api_protocol import ChatCompletionRequest +from comps.cores.storages.models import ChatId, ChatMessage +from comps.cores.storages.stores import get_store_name -logger = CustomLogger("chathistory_mongo") +logger = CustomLogger(f"chathistory_{get_store_name()}") logflag = os.getenv("LOGFLAG", False) enable_mcp = os.getenv("ENABLE_MCP", "").strip().lower() in {"true", "1", "yes"} -class ChatMessage(BaseModel): - data: ChatCompletionRequest - first_query: Optional[str] = None - id: Optional[str] = None - - -class ChatId(BaseModel): - user: str - id: Optional[str] = None - - def get_first_string(value): if isinstance(value, str): return value @@ -63,16 +51,9 @@ async def create_documents(document: ChatMessage): if logflag: logger.info(document) try: - if document.data.user is None: - raise HTTPException(status_code=500, detail="Please provide the user information") - store = DocumentStore(document.data.user) - store.initialize_storage() if document.first_query is None: document.first_query = get_first_string(document.data.messages) - if document.id: - res = await store.update_document(document.id, document.data, document.first_query) - else: - res = await store.save_document(document) + res = await save_or_update(document) if logflag: logger.info(res) return res @@ -104,12 +85,7 @@ async def get_documents(document: ChatId): if logflag: logger.info(document) try: - store = DocumentStore(document.user) - store.initialize_storage() - if document.id is None: - res = await store.get_all_documents_of_user() - else: - res = await store.get_user_documents_by_id(document.id) + res = await get(document) if logflag: logger.info(res) return res @@ -141,12 +117,7 @@ async def delete_documents(document: ChatId): if logflag: logger.info(document) try: - store = DocumentStore(document.user) - store.initialize_storage() - if document.id is None: - raise Exception("Document id is required.") - else: - res = await store.delete_document(document.id) + res = await delete(document) if logflag: logger.info(res) return res diff --git a/comps/chathistory/src/requirements.in b/comps/chathistory/src/requirements.in index f6e39dfdd5..e69de29bb2 100644 --- a/comps/chathistory/src/requirements.in +++ b/comps/chathistory/src/requirements.in @@ -1 +0,0 @@ -motor diff --git a/comps/chathistory/src/requirements.txt b/comps/chathistory/src/requirements.txt index 8881b0dd31..909c08954e 100644 --- a/comps/chathistory/src/requirements.txt +++ b/comps/chathistory/src/requirements.txt @@ -1,8 +1,8 @@ # This file was autogenerated by uv via the following command: # uv pip compile ./comps/chathistory/src/requirements.in --universal -o ./comps/chathistory/src/requirements.txt -dnspython==2.7.0 - # via pymongo -motor==3.7.1 - # via -r ./comps/chathistory/src/requirements.in -pymongo==4.13.2 - # via motor +# dnspython==2.7.0 +# # via pymongo +# motor==3.7.1 +# # via -r ./comps/chathistory/src/requirements.in +# pymongo==4.13.2 +# # via motor diff --git a/comps/cores/common/storage.py b/comps/cores/common/storage.py index dbe2a409a6..ff4664dfd8 100644 --- a/comps/cores/common/storage.py +++ b/comps/cores/common/storage.py @@ -246,3 +246,15 @@ async def asearch(self, key: str, value: Any, search_type: str = "exact", **kwar list[dict]: A list of documents matching the search criteria. """ raise NotImplementedError("asearch_by_keyword method must be implemented by subclasses.") + + async def asearch_by_keyword(self, keyword: str, max_results: int = 5, **kwargs) -> list[dict]: + """Asynchronously search for documents in the store based on a specific keyword. + + Args: + keyword (str): The keyword to search for. + **kwargs: Additional arguments for the search query. + + Returns: + list[dict]: A list of documents matching the search criteria. + """ + raise NotImplementedError("asearch_by_keyword method must be implemented by subclasses.") diff --git a/comps/cores/storages/developer.md b/comps/cores/storages/developer.md index 3dade36756..6443deb007 100644 --- a/comps/cores/storages/developer.md +++ b/comps/cores/storages/developer.md @@ -17,9 +17,9 @@ To add a new storage backend, follow these steps: ## Example ```python -from comps.cores.storage import opea_store +from comps.cores.common.storage import OpeaStore -store = opea_store(name="arangodb") # "redis", "mongodb", etc. +store = OpeaStore(name="arangodb") # "redis", "mongodb", etc. result = store.save_document({"foo": "bar"}) diff --git a/comps/cores/storages/mongodb.py b/comps/cores/storages/mongodb.py index 652b8e8a3e..3c281c9562 100644 --- a/comps/cores/storages/mongodb.py +++ b/comps/cores/storages/mongodb.py @@ -27,6 +27,7 @@ def _initialize_db(self) -> None: MONGO_PORT = self.config.get("MONGO_PORT", 27017) DB_NAME = self.config.get("DB_NAME", "OPEA") COLLECTION_NAME = self.config.get("COLLECTION_NAME", "default") + self.collection_name = COLLECTION_NAME conn_url = f"mongodb://{MONGO_HOST}:{MONGO_PORT}/" try: @@ -46,7 +47,7 @@ def health_check(self) -> bool: bool: True if the connection is healthy, False otherwise. """ try: - self.collection.count_documents() + self.collection.count_documents({}, limit=1) logger.info("MongoDB Health check succeed!") return True except Exception as e: @@ -64,11 +65,12 @@ async def asave_document(self, doc: dict, **kwargs) -> bool | dict: bool | dict: The result of the save operation. """ try: - inserted_data = await self.collection.insert_one( - doc.model_dump(by_alias=True, mode="json", exclude={"doc_id"}) - ) + doc.pop("_id", None) + inserted_data = await self.collection.insert_one(doc) + doc_id = str(inserted_data.inserted_id) - logger.info(f"Inserted document: {doc_id}") + logger.debug(f"Inserted document: {doc_id}") + return doc_id except Exception as e: @@ -87,10 +89,12 @@ async def asave_documents(self, docs: list[dict], **kwargs) -> bool | list: """ try: inserted_data = await self.collection.insert_many( - doc.model_dump(by_alias=True, mode="json", exclude={"doc_id"}) for doc in docs + [{key: value for key, value in doc.items() if key != "_id"} for doc in docs] ) + doc_ids = str(inserted_data.inserted_ids) - logger.info(f"Inserted documents: {doc_ids}") + logger.debug(f"Inserted documents: {doc_ids}") + return doc_ids except Exception as e: @@ -108,22 +112,15 @@ async def aupdate_document(self, doc: dict, **kwargs) -> bool | dict: bool | dict: The result of the update operation. """ try: - doc_id = doc.get("doc_id", None) + doc_id = doc.pop("_id", None) _id = ObjectId(doc_id) - first_query = doc.get("first_query", None) - data = doc.get("data", None) - if first_query: - data = {"data": data.model_dump(by_alias=True, mode="json"), "first_query": first_query} - else: - data = {"data": data.model_dump(by_alias=True, mode="json")} - updated_result = await self.collection.update_one( - {"_id": _id, "data.user": self.user}, - {"$set": data}, + {"_id": _id}, + {"$set": doc}, ) if updated_result.modified_count == 1: - logger.info(f"Updated document: {doc_id}") + logger.debug(f"Updated document: {doc_id}") return True else: raise Exception("Not able to update the data.") @@ -166,11 +163,10 @@ async def aget_document_by_id(self, id: str, **kwargs) -> dict | None: """ try: _id = ObjectId(id) - response: dict | None = await self.collection.find_one({"_id": _id, "chat_data.user": self.user}) + response: dict | None = await self.collection.find_one({"_id": _id}) if response: - del response["_id"] - logger.info(f"Retrieved document: {id}") - return response["data"] + logger.debug(f"Retrieved document: {id}") + return response return None except BsonError.InvalidId as e: @@ -195,11 +191,10 @@ async def aget_documents_by_ids(self, ids: list[str], **kwargs) -> list[dict]: responses = [] for id in ids: _id = ObjectId(id) - response: dict | None = await self.collection.find_one({"_id": _id, "chat_data.user": self.user}) + response: dict | None = await self.collection.find_one({"_id": _id}) if response: - del response["_id"] - responses.append(response["data"]) - logger.info(f"Retrieved documents: {response}") + responses.append(response) + logger.debug(f"Retrieved documents: {response}") return responses except BsonError.InvalidId as e: @@ -210,36 +205,6 @@ async def aget_documents_by_ids(self, ids: list[str], **kwargs) -> list[dict]: logger.info(e) raise Exception(e) - async def aget_documents_by_user(self, user: str = None, **kwargs) -> list[dict] | None: - """Asynchronously retrieve all documents for a specific user. - - Args: - user (str): The unique identifier for the user. - **kwargs: Additional arguments for retrieving the documents. - - Returns: - list[dict] | None: List of dict of feedback data of the user, None otherwise. - - Raises: - Exception: If there is an error while retrieving data. - """ - try: - responses = [] - if user is None: - user = self.user - cursor = await self.collection.find({"user": user}, {"data": 0}) - - async for document in cursor: - document["doc_id"] = str(document["_id"]) - del document["_id"] - responses.append(document) - logger.info(f"Retrieved documents: {responses}") - return responses - - except Exception as e: - logger.info(e) - raise Exception(e) - async def adelete_document(self, id: str, **kwargs) -> bool: """Asynchronously delete a single document from the store. @@ -256,7 +221,7 @@ async def adelete_document(self, id: str, **kwargs) -> bool: """ try: _id = ObjectId(id) - result = await self.collection.delete_one({"_id": _id, "chat_data.user": self.user}) + result = await self.collection.delete_one({"_id": _id}) delete_count = result.deleted_count logger.info(f"Deleted {delete_count} documents!") @@ -286,7 +251,7 @@ async def adelete_documents(self, ids: list[str], **kwargs) -> bool: Exception: If any errors occurs during delete process. """ try: - result = await self.collection.delete_many({"_id": {"$in": ids}, "chat_data.user": self.user}) + result = await self.collection.delete_many({"_id": {"$in": ids}}) delete_count = result.deleted_count logger.info(f"Deleted {delete_count} documents!") @@ -314,24 +279,47 @@ async def asearch(self, key: str, value: Any = None, search_type: str = "exact", list[dict]: A list of matching documents. """ try: - # Create a text index if not already created - self.collection.create_index([("$**", "text")]) - # Perform text search - results = await self.collection.find({"$text": {"$search": key}}, {"score": {"$meta": "textScore"}}) - sorted_results = results.sort([("score", {"$meta": "textScore"})]) + responses = [] + if search_type == "exact": + query = {key: value} + elif search_type == "contains": + query = {key: {"$regex": value, "$options": "i"}} + else: + raise ValueError("Unsupported search type. Use 'exact' or 'contains'.") + + rss = self.collection.find(query) + if rss: + async for rs in rss: + responses.append(rs) + + logger.debug(f"Search results: {responses}") + return responses + + except Exception as e: + logger.exception("Failed to search.") + raise Exception(e) - # Return a list of top 5 most relevant data - relevant_data = await sorted_results.to_list(length=5) + async def asearch_by_keyword(self, keyword: str, max_results: int = 5, **kwargs) -> list[dict]: + """Asynchronously search for documents based on a keyword. - # Serialize data and return - serialized_data = [ - {"id": str(doc["_id"]), "data": doc["data"], "user": doc["user"], "score": doc["score"]} - for doc in relevant_data - ] + Args: + keyword (str): The keyword to search for. + **kwargs: Additional arguments for the search. - logger.info(f"Search results: {serialized_data}") - return serialized_data + Returns: + list[dict]: A list of matching documents. + """ + try: + # Create a text index if not already created + await self.collection.create_index([("$**", "text")]) + + # Perform text search + return ( + await self.collection.find({"$text": {"$search": keyword}}, {"score": {"$meta": "textScore"}}) + .sort([("score", {"$meta": "textScore"})]) + .to_list(length=max_results) + ) except Exception as e: - logger.info(e) + logger.exception("Failed to search by keyword.") raise Exception(e) diff --git a/comps/cores/storages/stores.py b/comps/cores/storages/stores.py new file mode 100644 index 0000000000..0cafa6202c --- /dev/null +++ b/comps/cores/storages/stores.py @@ -0,0 +1,260 @@ +# Copyright (C) 2025 RedisDB Inc. +# SPDX-License-Identifier: Apache-2.0 +"""OPEA Storage Factory Module. + +This module provides a factory pattern for creating and managing different storage backends +in the OPEA ecosystem. It supports multiple storage systems including MongoDB, ArangoDB, +and Redis, with configuration management through environment variables. + +The factory pattern allows for easy switching between storage backends without changing +application code, making the system more flexible and maintainable. +""" + +import os + +from comps.cores.common.storage import OpeaStore + +STORE_ID_COLS = { + "mongodb": "_id", + "arangodb": "_id", + "redis": "ID", +} + + +def get_store_name() -> str: + """Retrieves the configured storage backend name from environment variables. + + This function reads the OPEA_STORE_NAME environment variable to determine + which storage backend should be used by the application. The name is + normalized to lowercase for consistency. + + Returns: + str: The normalized storage backend name (e.g., "mongodb", "arangodb", "redis"). + + Raises: + Exception: If the OPEA_STORE_NAME environment variable is not set or is empty. + + Example: + >>> os.environ['OPEA_STORE_NAME'] = 'arangodb' + >>> get_store_name() + 'arangodb' + """ + store_name = os.getenv("OPEA_STORE_NAME") + if store_name is None: + raise Exception( + "Environment variable 'OPEA_STORE_NAME' is not set. " + "Please configure it with a supported storage backend name (mongodb, arangodb, redis)." + ) + if store_name not in STORE_ID_COLS.keys(): + raise Exception( + f"Storage backend '{store_name}' is not supported. " f"Supported backends are: mongodb, arangodb, redis" + ) + return store_name.lower() + + +def _get_store_cfg(user: str) -> dict: + """Builds and returns the configuration dictionary for the specified storage backend. + + This internal function creates a configuration dictionary containing all necessary + parameters for initializing the chosen storage backend. It reads environment + variables with sensible defaults to configure the storage connection. + + Args: + user (str): The username/identifier for whom the configuration is being generated. + This is used for user-scoped data isolation and access control. + + Returns: + dict: A dictionary containing all configuration parameters required for the + specified storage backend. The structure varies by backend type: + + - MongoDB: Contains MONGO_HOST, MONGO_PORT, DB_NAME, COLLECTION_NAME + - ArangoDB: Contains ARANGODB_HOST, ARANGODB_USERNAME, ARANGODB_PASSWORD, etc. + - Redis: Contains REDIS_URL, INDEX_NAME, DOC_PREFIX, AUTO_CREATE_INDEX + + Raises: + Exception: If the configured storage backend is not supported. + + Note: + This is an internal function and should not be called directly. + Use get_store() instead for public API access. + """ + name = get_store_name() + + # ArangoDB configuration with connection and authentication parameters + if name == "arangodb": + return { + "user": user, + "ARANGODB_HOST": os.getenv("ARANGODB_HOST", "http://localhost:8529"), + "ARANGODB_USERNAME": os.getenv("ARANGODB_USERNAME", "root"), + "ARANGODB_PASSWORD": os.getenv("ARANGODB_PASSWORD", ""), + "ARANGODB_DB_NAME": os.getenv("ARANGODB_DB_NAME", "_system"), + "ARANGODB_COLLECTION_NAME": os.getenv("ARANGODB_COLLECTION_NAME", "default"), + } + + # MongoDB configuration with host, port, and database settings + elif name == "mongodb": + return { + "user": user, + "MONGO_HOST": os.getenv("MONGO_HOST", "localhost"), + "MONGO_PORT": os.getenv("MONGO_PORT", 27017), + "DB_NAME": os.getenv("DB_NAME", "OPEA"), + "COLLECTION_NAME": os.getenv("COLLECTION_NAME", "ChatHistory"), + } + + # Redis configuration with URL and indexing parameters + elif name == "redis": + return { + "user": user, + "REDIS_URL": os.getenv("REDIS_URL", "redis://localhost:6379"), + "INDEX_NAME": os.getenv("INDEX_NAME", "opea:index"), + "DOC_PREFIX": os.getenv("DOC_PREFIX", "doc:"), + "AUTO_CREATE_INDEX": os.getenv("AUTO_CREATE_INDEX", True), + } + + # Future storage backends can be added here following the same pattern + else: + raise Exception( + f"Storage backend '{name}' is not supported. " f"Supported backends are: mongodb, arangodb, redis" + ) + + +def get_store(user: str) -> OpeaStore: + """Factory function to create and initialize a storage backend instance. + + This is the main entry point for obtaining a configured and connected storage + instance. It uses the factory pattern to abstract storage backend selection + and initialization, making it easy to switch between different storage systems. + + The function performs the following operations: + 1. Validates the user parameter + 2. Determines the storage backend from environment configuration + 3. Creates the appropriate storage instance with configuration + 4. Initializes the storage connection/database + 5. Performs a health check to ensure the connection is working + + Args: + user (str): The username/identifier for whom the store is being initialized. + This is required for user-scoped data access and isolation. + Cannot be None or empty. + + Returns: + OpeaStore: A fully initialized and health-checked instance of the requested + storage backend. The instance is ready for immediate use. + + Raises: + Exception: If any of the following conditions occur: + - User information is not provided (None or empty) + - The configured storage backend is not supported + - Storage initialization fails + - Health check fails (cannot connect to storage) + + Example: + >>> os.environ['OPEA_STORE_NAME'] = 'mongodb' + >>> store = get_store('user_foo') + """ + # Validate user parameter - required for all storage operations + if not user: + raise Exception( + "User information is required to initialize the data store. " "Please provide a valid user identifier." + ) + + name = get_store_name() + store_cfg = _get_store_cfg(user) + + store = None + + # Initialize MongoDB store with database setup + if name == "mongodb": + from comps.cores.storages.mongodb import MongoDBStore + + store = MongoDBStore(name, config=store_cfg) + store._initialize_db() + + # Initialize ArangoDB store with connection setup + elif name == "arangodb": + from comps.cores.storages.arangodb import ArangoDBStore + + store = ArangoDBStore(name, config=store_cfg) + store._initialize_connection() + + # Initialize Redis store with connection setup + elif name == "redis": + from comps.cores.storages.redisdb import RedisDBStore + + store = RedisDBStore(name, config=store_cfg) + store._initialize_connection() + + # Future storage backends can be added here. + + # Ensure we have a valid store instance + if store is None: + raise Exception( + f"Storage backend '{name}' is not supported. " f"Supported backends are: mongodb, arangodb, redis" + ) + + # Verify the store is healthy and ready for use + if store.health_check(): + return store + else: + raise Exception( + f"Failed to establish a healthy connection to {name} storage backend. " + f"Please check your configuration and ensure the storage service is running." + ) + + +def column_to_id(col_name: str, doc: dict) -> dict: + """Formats the document's ID field to match store's requirements. + + Args: + col_name (str): The name of the ID column. + doc (dict): The document to be formatted. + Returns: + dict: The formatted document with the correct ID field. + """ + store_name = get_store_name() + if col_name and col_name in doc: + doc[STORE_ID_COLS[store_name]] = doc.pop(col_name) + return doc + + +def id_to_column(col_name: str, doc: dict) -> dict: + """Formats the document's ID field from store's requirements + to the application's requirements. + + Args: + col_name (str): The name of the ID column. + doc (dict): The document to be formatted. + Returns: + dict: The formatted document with the correct ID field. + """ + store_name = get_store_name() + if col_name and STORE_ID_COLS[store_name] in doc: + doc[col_name] = str(doc.pop(STORE_ID_COLS[store_name])) + return doc + + +def get_id_col_name() -> str: + """Retrieves the ID column name for the configured storage backend. + + This function returns the appropriate ID column name based on the + currently configured storage backend. It abstracts away the differences + in ID field naming conventions across different storage systems. + + Returns: + str: The ID column name used by the configured storage backend. + + Raises: + Exception: If the configured storage backend is not supported. + + Example: + >>> os.environ['OPEA_STORE_NAME'] = 'mongodb' + >>> get_id_col_name() + '_id' + """ + store_name = get_store_name() + if store_name in STORE_ID_COLS: + return STORE_ID_COLS[store_name] + else: + raise Exception( + f"Storage backend '{store_name}' is not supported. " f"Supported backends are: mongodb, arangodb, redis" + ) diff --git a/comps/feedback_management/README.md b/comps/feedback_management/README.md index 6bca80ebad..2bd0095451 100644 --- a/comps/feedback_management/README.md +++ b/comps/feedback_management/README.md @@ -2,57 +2,18 @@ The Feedback Management microservice facilitates the storage and retrieval of users'feedback data by establishing a connection with the databases. This microservice is designed to seamlessly integrate with OPEA applications, enabling data persistence and efficient management of feedback data. ---- - ## 🛠️ Features - **Store Feedback**: Save feedback data from user into database. - **Retrieve Feedback**: Fetch feedback data from database based on user or id. - **Update Feedback**: Update feedback data info in the database based on id. - **Delete Feedback**: Remove feedback record from database. -- **MCP Support**: Enable AI agents to programmatically interact with feedback data through Model Context Protocol. - ---- - -## ⚙️ Implementation - -The Feedback Management microservice able to support various database backends for storing the feedback data. - -### Feedback Management with MongoDB - -For more detail, please refer to this [README](./src/README.md) - -## 🤖 MCP (Model Context Protocol) Support - -The Feedback Management microservice supports MCP, allowing AI agents to discover and use its functionality programmatically. - -### Enabling MCP - -To enable MCP support, set the environment variable: - -```bash -export ENABLE_MCP=true -``` - -Or in your docker-compose.yaml: - -```yaml -environment: - ENABLE_MCP: true -``` - -### MCP Tools Available - -When MCP is enabled, the following tools are exposed to AI agents: - -1. **create_feedback_data** - Create or update feedback data for AI-generated responses including ratings and comments -2. **get_feedback** - Retrieve feedback data by ID or get all feedback for a specific user -3. **delete_feedback** - Delete specific feedback data by user ID and feedback ID - -### Using with AI Agents -AI agents can connect to the service via the SSE transport endpoint at `/sse` when MCP is enabled. The service will be automatically discovered by agents using the OPEA MCP Tools Manager. +## ⚙️ Deployment Options -### Backward Compatibility +To get detailed, step-by-step instructions on deploying the `feedback_management` microservice, you should consult the deployment guide. This guide will walk you through all the necessary steps, from building the Docker images to configuring your environment and running the service. -MCP support is disabled by default to maintain backward compatibility. The service continues to work normally via HTTP endpoints regardless of the MCP setting. +| Platform | Deployment Method | Database | Link | +| -------- | ----------------- | -------- | --------------------------------------------------------- | +| CPU | Docker | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) | +| CPU | Docker Compose | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) | diff --git a/comps/feedback_management/src/README.md b/comps/feedback_management/deployment/docker_compose/README.md similarity index 99% rename from comps/feedback_management/src/README.md rename to comps/feedback_management/deployment/docker_compose/README.md index 0634a39bcb..7ccfae108e 100644 --- a/comps/feedback_management/src/README.md +++ b/comps/feedback_management/deployment/docker_compose/README.md @@ -9,6 +9,7 @@ This README provides setup guides and all the necessary information about the Fe ```bash export http_proxy=${your_http_proxy} export https_proxy=${your_http_proxy} +export OPEA_STORE_NAME="mongodb" export MONGO_HOST=${MONGO_HOST} export MONGO_HOST=27017 export DB_NAME=${DB_NAME} diff --git a/comps/feedback_management/deployment/docker_compose/compose.yaml b/comps/feedback_management/deployment/docker_compose/compose.yaml index 06057fe350..5b602a13bb 100644 --- a/comps/feedback_management/deployment/docker_compose/compose.yaml +++ b/comps/feedback_management/deployment/docker_compose/compose.yaml @@ -24,6 +24,7 @@ services: http_proxy: ${http_proxy} https_proxy: ${https_proxy} no_proxy: ${no_proxy} + OPEA_STORE_NAME: ${OPEA_STORE_NAME:-mongodb} MONGO_HOST: ${MONGO_HOST} MONGO_PORT: ${MONGO_PORT} DB_NAME: ${DB_NAME} diff --git a/comps/feedback_management/src/feedback_store.py b/comps/feedback_management/src/feedback_store.py deleted file mode 100644 index 92b33b4eff..0000000000 --- a/comps/feedback_management/src/feedback_store.py +++ /dev/null @@ -1,156 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import bson.errors as BsonError -from bson.objectid import ObjectId -from integrations.mongo.config import COLLECTION_NAME -from integrations.mongo.mongo_conn import MongoClient - - -class FeedbackStore: - - def __init__( - self, - user: str, - ): - self.user = user - - def initialize_storage(self, db_type="mongo") -> None: - if db_type == "mongo": - self.db_client = MongoClient.get_db_client() - self.collection = self.db_client[COLLECTION_NAME] - - async def save_feedback(self, feedback_data) -> str: - """Stores a new feedback data into the storage. - - Args: - feedback_data (object): The document to be stored. - - Returns: - str: The ID of the inserted feedback data. - - Raises: - Exception: If an error occurs while storing the feedback_data. - """ - try: - inserted_feedback_data = await self.collection.insert_one( - feedback_data.model_dump(by_alias=True, mode="json", exclude={"feedback_id"}) - ) - feedback_id = str(inserted_feedback_data.inserted_id) - return feedback_id - - except Exception as e: - print(e) - raise Exception(e) - - async def update_feedback(self, feedback_data) -> bool: - """Update a feedback data in the collection with given id. - - Args: - feedback_id (str): The ID of the data to be updated. - updated_data (object): The data to be updated in the entry. - - Returns: - bool: True if the data updated successfully, False otherwise. - """ - try: - _id = ObjectId(feedback_data.feedback_id) - updated_result = await self.collection.update_one( - {"_id": _id, "chat_data.user": self.user}, - {"$set": {"feedback_data": feedback_data.feedback_data.model_dump(by_alias=True, mode="json")}}, - ) - - if updated_result.modified_count == 1: - print(f"Updated document: {feedback_data.feedback_id} !") - return True - else: - raise Exception("Not able to update the data.") - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) - - async def get_all_feedback_of_user(self) -> list[dict]: - """Retrieves all feedback data of a user from the collection. - - Returns: - list[dict] | None: List of dict of feedback data of the user, None otherwise. - - Raises: - Exception: If there is an error while retrieving data. - """ - try: - feedback_data_list: list = [] - cursor = self.collection.find({"chat_data.user": self.user}, {"feedback_data": 0}) - - async for document in cursor: - document["feedback_id"] = str(document["_id"]) - del document["_id"] - feedback_data_list.append(document) - return feedback_data_list - - except Exception as e: - print(e) - raise Exception(e) - - async def get_feedback_by_id(self, feedback_id) -> dict | None: - """Retrieves a user feedback data from the collection based on the given feedback ID. - - Args: - feedback_id (str): The ID of the feedback data to retrieve. - - Returns: - dict | None: The user's feedback data if found, None otherwise. - - Raises: - Exception: If there is an error while retrieving data. - """ - try: - _id = ObjectId(feedback_id) - response: dict | None = await self.collection.find_one({"_id": _id, "chat_data.user": self.user}) - if response: - del response["_id"] - return response - return None - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) - - async def delete_feedback(self, feedback_id) -> bool: - """Delete a feedback data from collection by given feedback_id. - - Args: - feedback_id(str): The ID of the feedback data to be deleted. - - Returns: - bool: True if feedback is successfully deleted, False otherwise. - - Raises: - KeyError: If the provided feedback_id is invalid: - Exception: If any errors occurs during delete process. - """ - try: - _id = ObjectId(feedback_id) - result = await self.collection.delete_one({"_id": _id, "chat_data.user": self.user}) - - delete_count = result.deleted_count - print(f"Deleted {delete_count} documents!") - - return True if delete_count == 1 else False - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) diff --git a/comps/feedback_management/src/integrations/data_store.py b/comps/feedback_management/src/integrations/data_store.py new file mode 100644 index 0000000000..69905930d3 --- /dev/null +++ b/comps/feedback_management/src/integrations/data_store.py @@ -0,0 +1,153 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from typing import Optional + +from fastapi import HTTPException +from pydantic import BaseModel + +from comps.cores.proto.api_protocol import ChatCompletionRequest +from comps.cores.storages.models import ChatFeedback, FeedbackData, FeedbackId +from comps.cores.storages.stores import column_to_id, get_store, id_to_column + + +class ChatFeedbackDto(BaseModel): + chat_data: ChatCompletionRequest + feedback_data: FeedbackData + chat_id: Optional[str] = None + feedback_id: Optional[str] = None + user: str + + +def _prepersist(feedback: ChatFeedback) -> dict: + """Converts a ChatFeedback object to a dictionary suitable for persistence. + + Args: + feedback (ChatFeedback): The ChatFeedback object to be converted. + + Returns: + dict: A dictionary representation of the ChatFeedback, ready for persistence. + """ + data_dict = feedback.model_dump(by_alias=True, mode="json") + data_dict = column_to_id("feedback_id", data_dict) + return data_dict + + +def _post_getby_id(rs: dict) -> dict: + """Processes a single feedback record after retrieval by ID. + + Converts the internal 'id' field back to 'feedback_id' for external use. + + Args: + rs (dict): The raw feedback record dictionary from the store. + + Returns: + dict: The processed feedback record with proper field naming. + """ + rs = id_to_column("feedback_id", rs) + return rs + + +def _post_getby_user(rss: list) -> list: + """Processes multiple feedback records after retrieval by user. + + Converts the internal 'id' field back to 'feedback_id' for each record in the list. + + Args: + rss (list): A list of raw feedback record dictionaries from the store. + + Returns: + list: The list of processed feedback records with proper field naming. + """ + for rs in rss: + rs = id_to_column("feedback_id", rs) + return rss + + +def _check_user_info(feedback: ChatFeedback | FeedbackId): + """Checks if the user information is provided in the document. + + Args: + feedback (ChatFeedback|FeedbackId): The feedback to be checked. + + Raises: + HTTPException: If the user information is missing. + """ + user = feedback.chat_data.user if isinstance(feedback, ChatFeedback) else feedback.user + if user is None or (isinstance(user, str) and user.strip() == ""): + raise HTTPException(status_code=400, detail="User information is required but not provided") + + +async def save_or_update(feedback: ChatFeedback): + """Saves a new feedback record or updates an existing one. + + This function determines whether to create a new feedback record or update + an existing one based on the presence of feedback_id. If feedback_id is None, + a new record is created; otherwise, the existing record is updated. + + Args: + feedback (ChatFeedback): The ChatFeedback object to be saved or updated. + + Returns: + The result from the store operation (save or update). + + Raises: + HTTPException: If user information is missing. + """ + _check_user_info(feedback) + store = get_store(feedback.chat_data.user) + if feedback.feedback_id is None: + return await store.asave_document(_prepersist(feedback)) + else: + return await store.aupdate_document(_prepersist(feedback)) + + +async def get(feedback: FeedbackId): + """Retrieves feedback record(s) based on the provided FeedbackId. + + This function can retrieve either a specific feedback record by its ID + or all feedback records for a user. If feedback_id is provided, it returns + the specific record; otherwise, it returns all records for the user. + + Args: + feedback (FeedbackId): The FeedbackId object containing user and optional feedback_id. + + Returns: + Either a specific feedback document (if feedback_id provided) or a list of + all feedback documents for the user. + + Raises: + HTTPException: If user information is missing. + """ + _check_user_info(feedback) + store = get_store(feedback.user) + if feedback.feedback_id: + rs = await store.aget_document_by_id(feedback.feedback_id) + return _post_getby_id(rs) + else: + rss = await store.asearch(key="chat_data.user", value=feedback.user) + return _post_getby_user(rss) + + +async def delete(feedback: FeedbackId): + """Deletes a specific feedback record from the store. + + This function removes a feedback record identified by the feedback_id. + The feedback_id must be provided and cannot be None. + + Args: + feedback (FeedbackId): The FeedbackId object containing user and feedback_id. + + Returns: + The result from the store delete operation. + + Raises: + HTTPException: If user information is missing. + Exception: If feedback_id is None or not provided. + """ + _check_user_info(feedback) + store = get_store(feedback.user) + if feedback.feedback_id is None: + raise Exception("feedback_id is required.") + else: + return await store.adelete_document(feedback.feedback_id) diff --git a/comps/feedback_management/src/integrations/mongo/__init__.py b/comps/feedback_management/src/integrations/mongo/__init__.py deleted file mode 100644 index 916f3a44b2..0000000000 --- a/comps/feedback_management/src/integrations/mongo/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/feedback_management/src/integrations/mongo/config.py b/comps/feedback_management/src/integrations/mongo/config.py deleted file mode 100644 index ce8f5390cc..0000000000 --- a/comps/feedback_management/src/integrations/mongo/config.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import os - -# MONGO configuration -MONGO_HOST = os.getenv("MONGO_HOST", "localhost") -MONGO_PORT = os.getenv("MONGO_PORT", 27017) -DB_NAME = os.getenv("DB_NAME", "OPEA") -COLLECTION_NAME = os.getenv("COLLECTION_NAME", "Feedback") diff --git a/comps/feedback_management/src/integrations/mongo/mongo_conn.py b/comps/feedback_management/src/integrations/mongo/mongo_conn.py deleted file mode 100644 index 48cd19a1af..0000000000 --- a/comps/feedback_management/src/integrations/mongo/mongo_conn.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from typing import Any - -import motor.motor_asyncio as motor - -from .config import DB_NAME, MONGO_HOST, MONGO_PORT - - -class MongoClient: - conn_url = f"mongodb://{MONGO_HOST}:{MONGO_PORT}/" - - @staticmethod - def get_db_client() -> Any: - try: - client = motor.AsyncIOMotorClient(MongoClient.conn_url) - db = client[DB_NAME] - return db - - except Exception as e: - print(e) - raise Exception() diff --git a/comps/feedback_management/src/opea_feedback_microservice.py b/comps/feedback_management/src/opea_feedback_microservice.py index a786e2f354..368972a156 100644 --- a/comps/feedback_management/src/opea_feedback_microservice.py +++ b/comps/feedback_management/src/opea_feedback_microservice.py @@ -1,66 +1,23 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 import os -from typing import Annotated, Optional from fastapi import HTTPException -from feedback_store import FeedbackStore -from pydantic import BaseModel, Field from comps import CustomLogger from comps.cores.mega.constants import MCPFuncType from comps.cores.mega.micro_service import opea_microservices, register_microservice -from comps.cores.proto.api_protocol import ChatCompletionRequest +from comps.cores.storages.models import ChatFeedback, FeedbackData, FeedbackId +from comps.cores.storages.stores import get_store, get_store_name +from comps.feedback_management.src.integrations.data_store import delete, get, save_or_update -logger = CustomLogger("feedback_mongo") +logger = CustomLogger(f"feedback_{get_store_name()}") logflag = os.getenv("LOGFLAG", False) # Enable MCP support based on environment variable enable_mcp = os.getenv("ENABLE_MCP", "").strip().lower() in {"true", "1", "yes"} -class FeedbackData(BaseModel): - """This class represents the data model of FeedbackData collected to store in database.". - - Attributes: - is_thumbs_up (bool): True if the response is satisfy, False otherwise. - rating: (int)[Optional]: Score rating. Range from 0 (bad rating) to 5(good rating). - comment (str)[Optional]: Comment given for response. - """ - - is_thumbs_up: bool - rating: Annotated[Optional[int], Field(ge=0, le=5)] = None - comment: Optional[str] = None - - -class ChatFeedback(BaseModel): - """This class represents the model for chat to collect FeedbackData together with ChatCompletionRequest data to store in database. - - Attributes: - chat_data (ChatCompletionRequest): ChatCompletionRequest object containing chat data to be stored. - feedback_data (FeedbackData): FeedbackData object containing feedback data for chat to be stored. - chat_id (str)[Optional]: The chat_id associated to the chat to be store together with feedback data. - feedback_id (str)[Optional]: The feedback_id of feedback data to be retrieved from database. - """ - - chat_data: ChatCompletionRequest - feedback_data: FeedbackData - chat_id: Optional[str] = None - feedback_id: Optional[str] = None - - -class FeedbackId(BaseModel): - """This class represent the data model for retrieve feedback data stored in database. - - Attributes: - user (str): The user of the requested feedback data. - feedback_id (str): The feedback_id of feedback data to be retrieved from database. - """ - - user: str - feedback_id: Optional[str] = None - - @register_microservice( name="opea_service@feedback_mongo", endpoint="/v1/feedback/create", @@ -84,12 +41,7 @@ async def create_feedback_data(feedback: ChatFeedback): logger.info(feedback) try: - feedback_store = FeedbackStore(feedback.chat_data.user) - feedback_store.initialize_storage() - if feedback.feedback_id is None: - response = await feedback_store.save_feedback(feedback) - else: - response = await feedback_store.update_feedback(feedback) + response = await save_or_update(feedback) if logflag: logger.info(response) @@ -123,12 +75,7 @@ async def get_feedback(feedback: FeedbackId): logger.info(feedback) try: - feedback_store = FeedbackStore(feedback.user) - feedback_store.initialize_storage() - if feedback.feedback_id: - response = await feedback_store.get_feedback_by_id(feedback.feedback_id) - else: - response = await feedback_store.get_all_feedback_of_user() + response = await get(feedback) if logflag: logger.info(response) @@ -163,12 +110,7 @@ async def delete_feedback(feedback: FeedbackId): logger.info(feedback) try: - feedback_store = FeedbackStore(feedback.user) - feedback_store.initialize_storage() - if feedback.feedback_id is None: - raise Exception("feedback_id is required.") - else: - response = await feedback_store.delete_feedback(feedback.feedback_id) + response = await delete(feedback) if logflag: logger.info(response) diff --git a/comps/feedback_management/src/requirements.in b/comps/feedback_management/src/requirements.in index 9fea481eba..fd4697afed 100644 --- a/comps/feedback_management/src/requirements.in +++ b/comps/feedback_management/src/requirements.in @@ -1,2 +1 @@ -motor mcp \ No newline at end of file diff --git a/comps/prompt_registry/README.md b/comps/prompt_registry/README.md index c6be5ffde9..ae4989d155 100644 --- a/comps/prompt_registry/README.md +++ b/comps/prompt_registry/README.md @@ -2,56 +2,17 @@ The Prompt Registry microservice facilitates the storage and retrieval of users' preferred prompts by establishing a connection with the databases. This microservice is designed to seamlessly integrate with OPEA applications, enabling data persistence and efficient management of user's preferred prompts. ---- - ## 🛠️ Features - **Store Prompt**: Save user's preferred prompt into database. - **Retrieve Prompt**: Fetch prompt from database based on user, id or even a keyword search. - **Delete Prompt**: Remove prompt from database. -- **MCP Support**: Enable AI agents to discover and use prompt management capabilities via Model Context Protocol. - ---- - -## ⚙️ Implementation - -The Prompt Registry microservice able to support various database backends for storing the prompts. - -### Prompt Registry with MongoDB - -For more detail, please refer to this [README](./src/README.md) - -### MCP (Model Context Protocol) Support - -The Prompt Registry microservice supports MCP, allowing AI agents to discover and utilize its prompt management capabilities. When MCP is enabled, the service exposes three tools: - -- `create_prompt`: Store a user's preferred prompt in the database -- `get_prompt`: Retrieve prompts by user, ID, or keyword search -- `delete_prompt`: Delete a prompt by ID from the database - -To enable MCP support, set the `ENABLE_MCP` environment variable: - -```bash -export ENABLE_MCP=true -``` - -Or in Docker Compose: - -```yaml -environment: - ENABLE_MCP: true -``` - -**Important Note**: When MCP is enabled (`ENABLE_MCP=true`), the service operates in MCP-only mode: -- Regular HTTP endpoints (`/v1/prompt/create`, `/v1/prompt/get`, `/v1/prompt/delete`) are not available -- The service only exposes the SSE endpoint (`/sse`) for MCP protocol communication -- AI agents interact with the service through MCP tools, not HTTP APIs -- To use both HTTP endpoints and MCP, you would need to run two instances of the service (one with MCP enabled, one without) +## ⚙️ Deployment Options -When MCP is enabled, AI agents can: +To get detailed, step-by-step instructions on deploying the `prompt_registry` microservice, you should consult the deployment guide. This guide will walk you through all the necessary steps, from building the Docker images to configuring your environment and running the service. -- Build and manage prompt libraries dynamically -- Reuse prompts across conversations -- Create personalized prompt repositories -- Share and discover prompts programmatically +| Platform | Deployment Method | Database | Link | +| -------- | ----------------- | -------- | --------------------------------------------------------- | +| CPU | Docker | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) | +| CPU | Docker Compose | MongoDB | [Deployment Guide](./deployment/docker_compose/README.md) | diff --git a/comps/prompt_registry/src/README.md b/comps/prompt_registry/deployment/docker_compose/README.md similarity index 99% rename from comps/prompt_registry/src/README.md rename to comps/prompt_registry/deployment/docker_compose/README.md index 5961fbe7fa..d40a2825d5 100644 --- a/comps/prompt_registry/src/README.md +++ b/comps/prompt_registry/deployment/docker_compose/README.md @@ -9,6 +9,7 @@ This README provides setup guides and all the necessary information about the Pr ```bash export http_proxy=${your_http_proxy} export https_proxy=${your_http_proxy} +export OPEA_STORE_NAME="mongodb" export MONGO_HOST=${MONGO_HOST} export MONGO_HOST=27017 export DB_NAME=${DB_NAME} diff --git a/comps/prompt_registry/deployment/docker_compose/compose.yaml b/comps/prompt_registry/deployment/docker_compose/compose.yaml index af9198fb13..70e057a160 100644 --- a/comps/prompt_registry/deployment/docker_compose/compose.yaml +++ b/comps/prompt_registry/deployment/docker_compose/compose.yaml @@ -24,6 +24,7 @@ services: http_proxy: ${http_proxy} https_proxy: ${https_proxy} no_proxy: ${no_proxy} + OPEA_STORE_NAME: ${OPEA_STORE_NAME:-mongodb} MONGO_HOST: ${MONGO_HOST} MONGO_PORT: ${MONGO_PORT} DB_NAME: ${DB_NAME} diff --git a/comps/prompt_registry/src/integrations/data_store.py b/comps/prompt_registry/src/integrations/data_store.py new file mode 100644 index 0000000000..1bebfc1162 --- /dev/null +++ b/comps/prompt_registry/src/integrations/data_store.py @@ -0,0 +1,127 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from fastapi import HTTPException + +from comps.cores.storages.models import PromptCreate, PromptId +from comps.cores.storages.stores import get_id_col_name, get_store, id_to_column + + +def check_user_info(prompt: PromptCreate | PromptId): + """Checks if the user information is provided in the document. + + Args: + document (ChatFeedback|FeedbackId): The document to be checked. + + Raises: + HTTPException: If the user information is missing. + """ + user = prompt.user + if user is None or (isinstance(user, str) and user.strip() == ""): + raise HTTPException(status_code=400, detail="User information is required but not provided") + + +def _prepersist(prompt: PromptCreate) -> dict: + """Converts a PromptCreate object to a dictionary suitable for storage. + + Args: + prompt (PromptCreate): The PromptCreate object to be converted. + + Returns: + dict: A dictionary representation of the PromptCreate, ready for storage. + """ + return {"prompt_text": prompt.prompt_text, "user": prompt.user} + + +def _post_getby_id(rs: dict) -> str: + """Post-processes a single document retrieved by ID. + + Args: + rs (dict): The document dictionary retrieved from storage. + + Returns: + str: prompt_text. + """ + return rs.get("prompt_text", None) + + +def _postget(rss: list) -> list: + """Post-processes a list of documents by removing the ID column from each document. + + Args: + rss (list): List of document dictionaries retrieved from storage. + + Returns: + list: List of document dictionaries with ID columns removed. + """ + for rs in rss: + rs.pop(get_id_col_name(), None) + return rss + + +def _postsearch(rss: list) -> list: + return [id_to_column("id", doc) for doc in rss] + + +async def save(prompt: PromptCreate): + """Saves a prompt to the data store after validating user information. + + Args: + prompt (PromptCreate): The prompt object to be saved. + + Returns: + The result of the save operation from the underlying storage. + + Raises: + HTTPException: If user information validation fails. + """ + check_user_info(prompt) + store = get_store(prompt.user) + return await store.asave_document(_prepersist(prompt)) + + +async def get(prompt: PromptId): + """Retrieves prompt(s) from the data store based on the provided criteria. + + Args: + prompt (PromptId): The prompt identifier object containing search criteria. + + Returns: + dict or list: A single prompt dictionary if searching by ID, + or a list of prompt dictionaries if searching by text or user. + + Raises: + HTTPException: If user information validation fails. + """ + check_user_info(prompt) + store = get_store(prompt.user) + if prompt.prompt_id is not None: + rs = await store.aget_document_by_id(prompt.prompt_id) + return _post_getby_id(rs) + elif prompt.prompt_text: + rss = await store.asearch_by_keyword(keyword=prompt.prompt_text, max_results=5) + return _postsearch(rss) + else: + rss = await store.asearch(key="user", value=prompt.user) + return _postget(rss) + + +async def delete(prompt: PromptId): + """Deletes a prompt from the data store by its ID. + + Args: + prompt (PromptId): The prompt identifier object containing the prompt ID to delete. + + Returns: + The result of the delete operation from the underlying storage. + + Raises: + HTTPException: If user information validation fails. + Exception: If prompt_id is not provided. + """ + check_user_info(prompt) + store = get_store(prompt.user) + if prompt.prompt_id is None: + raise Exception("Prompt id is required.") + else: + return await store.adelete_document(prompt.prompt_id) diff --git a/comps/prompt_registry/src/integrations/mongo/__init__.py b/comps/prompt_registry/src/integrations/mongo/__init__.py deleted file mode 100644 index 916f3a44b2..0000000000 --- a/comps/prompt_registry/src/integrations/mongo/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/prompt_registry/src/integrations/mongo/config.py b/comps/prompt_registry/src/integrations/mongo/config.py deleted file mode 100644 index 17d8d772f6..0000000000 --- a/comps/prompt_registry/src/integrations/mongo/config.py +++ /dev/null @@ -1,10 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import os - -# MONGO configuration -MONGO_HOST = os.getenv("MONGO_HOST", "localhost") -MONGO_PORT = os.getenv("MONGO_PORT", 27017) -DB_NAME = os.getenv("DB_NAME", "OPEA") -COLLECTION_NAME = os.getenv("COLLECTION_NAME", "Prompt") diff --git a/comps/prompt_registry/src/integrations/mongo/mongo_conn.py b/comps/prompt_registry/src/integrations/mongo/mongo_conn.py deleted file mode 100644 index 48cd19a1af..0000000000 --- a/comps/prompt_registry/src/integrations/mongo/mongo_conn.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from typing import Any - -import motor.motor_asyncio as motor - -from .config import DB_NAME, MONGO_HOST, MONGO_PORT - - -class MongoClient: - conn_url = f"mongodb://{MONGO_HOST}:{MONGO_PORT}/" - - @staticmethod - def get_db_client() -> Any: - try: - client = motor.AsyncIOMotorClient(MongoClient.conn_url) - db = client[DB_NAME] - return db - - except Exception as e: - print(e) - raise Exception() diff --git a/comps/prompt_registry/src/opea_prompt_microservice.py b/comps/prompt_registry/src/opea_prompt_microservice.py index 8019229024..610268c3c9 100644 --- a/comps/prompt_registry/src/opea_prompt_microservice.py +++ b/comps/prompt_registry/src/opea_prompt_microservice.py @@ -1,45 +1,18 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 import os -from typing import Optional - -from prompt_store import PromptStore -from pydantic import BaseModel from comps import CustomLogger, ServiceType from comps.cores.mega.constants import MCPFuncType from comps.cores.mega.micro_service import opea_microservices, register_microservice +from comps.cores.storages.models import PromptCreate, PromptId +from comps.prompt_registry.src.integrations.data_store import delete, get, save logger = CustomLogger("prompt_registry") logflag = os.getenv("LOGFLAG", False) enable_mcp = os.getenv("ENABLE_MCP", "").strip().lower() in {"true", "1", "yes"} -class PromptCreate(BaseModel): - """This class represents the data model for creating and storing a new prompt in the database. - - Attributes: - prompt_text (str): The text content of the prompt. - user (str): The user or creator of the prompt. - """ - - prompt_text: str - user: str - - -class PromptId(BaseModel): - """This class represent the data model for retrieve prompt stored in database. - - Attributes: - user (str): The user of the requested prompt. - prompt_id (str): The prompt_id of prompt to be retrieved from database. - """ - - user: str - prompt_id: Optional[str] = None - prompt_text: Optional[str] = None - - @register_microservice( name="opea_service@prompt_registry", service_type=ServiceType.PROMPT_REGISTRY, @@ -63,9 +36,7 @@ async def create_prompt(prompt: PromptCreate): if logflag: logger.info(prompt) try: - prompt_store = PromptStore(prompt.user) - prompt_store.initialize_storage() - response = await prompt_store.save_prompt(prompt) + response = await save(prompt) if logflag: logger.info(response) return response @@ -98,14 +69,7 @@ async def get_prompt(prompt: PromptId): if logflag: logger.info(prompt) try: - prompt_store = PromptStore(prompt.user) - prompt_store.initialize_storage() - if prompt.prompt_id is not None: - response = await prompt_store.get_user_prompt_by_id(prompt.prompt_id) - elif prompt.prompt_text: - response = await prompt_store.prompt_search(prompt.prompt_text) - else: - response = await prompt_store.get_all_prompt_of_user() + response = await get(prompt) if logflag: logger.info(response) return response @@ -138,12 +102,7 @@ async def delete_prompt(prompt: PromptId): if logflag: logger.info(prompt) try: - prompt_store = PromptStore(prompt.user) - prompt_store.initialize_storage() - if prompt.prompt_id is None: - raise Exception("Prompt id is required.") - else: - response = await prompt_store.delete_prompt(prompt.prompt_id) + response = await delete(prompt) if logflag: logger.info(response) return response diff --git a/comps/prompt_registry/src/prompt_store.py b/comps/prompt_registry/src/prompt_store.py deleted file mode 100644 index 7f363d9a68..0000000000 --- a/comps/prompt_registry/src/prompt_store.py +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - - -import bson.errors as BsonError -from bson.objectid import ObjectId -from integrations.mongo.config import COLLECTION_NAME -from integrations.mongo.mongo_conn import MongoClient - - -class PromptStore: - - def __init__( - self, - user: str, - ): - self.user = user - - def initialize_storage(self, db_type="mongo") -> None: - if db_type == "mongo": - self.db_client = MongoClient.get_db_client() - self.collection = self.db_client[COLLECTION_NAME] - - async def save_prompt(self, prompt) -> str: - """Stores a new prompt into the storage. - - Args: - prompt: The document to be stored. - - Returns: - str: The ID of the inserted prompt. - - Raises: - Exception: If an error occurs while storing the prompt. - """ - try: - inserted_prompt = await self.collection.insert_one( - prompt.model_dump(by_alias=True, mode="json", exclude={"id"}) - ) - prompt_id = str(inserted_prompt.inserted_id) - return prompt_id - - except Exception as e: - print(e) - raise Exception(e) - - async def get_all_prompt_of_user(self) -> list[dict]: - """Retrieves all prompts of a user from the collection. - - Returns: - list[dict] | None: List of dict of prompts of the user, None otherwise. - - Raises: - Exception: If there is an error while retrieving data. - """ - try: - prompt_list: list = [] - cursor = self.collection.find({"user": self.user}, {"data": 0}) - - async for document in cursor: - document["id"] = str(document["_id"]) - del document["_id"] - prompt_list.append(document) - return prompt_list - - except Exception as e: - print(e) - raise Exception(e) - - async def get_user_prompt_by_id(self, prompt_id) -> dict | None: - """Retrieves a user prompt from the collection based on the given prompt ID. - - Args: - prompt_id (str): The ID of the prompt to retrieve. - - Returns: - dict | None: The user prompt if found, None otherwise. - - Raises: - Exception: If there is an error while retrieving data. - """ - try: - _id = ObjectId(prompt_id) - response: dict | None = await self.collection.find_one({"_id": _id, "user": self.user}) - if response: - del response["_id"] - return response["prompt_text"] - return None - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) - - async def prompt_search(self, keyword) -> list | None: - """Retrieves prompt from the collection based on keyword provided. - - Args: - keyword (str): The keyword of prompt to search for. - - Returns: - list | None: The list of relevant prompt if found, None otherwise. - - Raises: - Exception: If there is an error while searching data. - """ - try: - # Create a text index if not already created - self.collection.create_index([("$**", "text")]) - # Perform text search - results = self.collection.find({"$text": {"$search": keyword}}, {"score": {"$meta": "textScore"}}) - sorted_results = results.sort([("score", {"$meta": "textScore"})]) - - # Return a list of top 5 most relevant data - relevant_data = await sorted_results.to_list(length=5) - - # Serialize data and return - serialized_data = [ - {"id": str(doc["_id"]), "prompt_text": doc["prompt_text"], "user": doc["user"], "score": doc["score"]} - for doc in relevant_data - ] - - return serialized_data - - except Exception as e: - print(e) - raise Exception(e) - - async def delete_prompt(self, prompt_id) -> bool: - """Delete a prompt from collection by given prompt_id. - - Args: - prompt_id(str): The ID of the prompt to be deleted. - - Returns: - bool: True if prompt is successfully deleted, False otherwise. - - Raises: - KeyError: If the provided prompt_id is invalid: - Exception: If any errors occurs during delete process. - """ - try: - _id = ObjectId(prompt_id) - result = await self.collection.delete_one({"_id": _id, "user": self.user}) - - delete_count = result.deleted_count - print(f"Deleted {delete_count} documents!") - - return True if delete_count == 1 else False - - except BsonError.InvalidId as e: - print(e) - raise KeyError(e) - - except Exception as e: - print(e) - raise Exception(e) diff --git a/comps/prompt_registry/src/requirements.in b/comps/prompt_registry/src/requirements.in index 9fea481eba..fd4697afed 100644 --- a/comps/prompt_registry/src/requirements.in +++ b/comps/prompt_registry/src/requirements.in @@ -1,2 +1 @@ -motor mcp \ No newline at end of file diff --git a/tests/chathistory/test_chathistory_mcp.sh b/tests/chathistory/test_chathistory_mcp.sh index 449ccb2bcf..f67798778d 100755 --- a/tests/chathistory/test_chathistory_mcp.sh +++ b/tests/chathistory/test_chathistory_mcp.sh @@ -9,6 +9,7 @@ ip_address=$(hostname -I | awk '{print $1}') export MONGO_HOST=${ip_address} export MONGO_PORT=27017 +export OPEA_STORE_NAME="mongodb" export DB_NAME=${DB_NAME:-"Conversations"} export COLLECTION_NAME=${COLLECTION_NAME:-"test"} export ENABLE_MCP=True diff --git a/tests/chathistory/test_chathistory_mongo.sh b/tests/chathistory/test_chathistory_mongo.sh index 3fa5bd443c..6b5c527931 100644 --- a/tests/chathistory/test_chathistory_mongo.sh +++ b/tests/chathistory/test_chathistory_mongo.sh @@ -11,6 +11,7 @@ export MONGO_HOST=${ip_address} export MONGO_PORT=27017 export DB_NAME=${DB_NAME:-"Conversations"} export COLLECTION_NAME=${COLLECTION_NAME:-"test"} +export OPEA_STORE_NAME="mongodb" function build_docker_images() { cd $WORKPATH @@ -35,6 +36,7 @@ function start_service() { } function validate_microservice() { + # Test create API result=$(curl -X 'POST' \ http://${ip_address}:${CHATHISTORY_PORT}/v1/chathistory/create \ -H 'accept: application/json' \ @@ -45,19 +47,99 @@ function validate_microservice() { } }') echo $result + id="" if [[ ${#result} -eq 26 ]]; then echo "Result correct." + id="${result//\"/}" else echo "Result wrong." docker logs chathistory-mongo-server exit 1 fi + # Test get_by_id API + result=$(curl -X 'POST' \ + http://${ip_address}:${CHATHISTORY_PORT}/v1/chathistory/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "id": "'${id}'"}') + echo $result + if [[ $result == *'{"messages":"test Messages"'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs chathistory-mongo-server + exit 1 + fi + + # Test get_by_user API + result=$(curl -X 'POST' \ + http://${ip_address}:${CHATHISTORY_PORT}/v1/chathistory/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test"}') + echo $result + if [[ $result == '[{"first_query":"test Messages"'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs chathistory-mongo-server + exit 1 + fi + + # Test update API + result=$(curl -X 'POST' \ + http://${ip_address}:${CHATHISTORY_PORT}/v1/chathistory/create \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "data": { + "messages": "test Messages update", "user": "test" + }, + "id": "'${id}'" +}') + echo $result + if [[ $result == *'true'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs chathistory-mongo-server + exit 1 + fi + + # Test get_by_id API + result=$(curl -X 'POST' \ + http://${ip_address}:${CHATHISTORY_PORT}/v1/chathistory/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "id": "'${id}'"}') + echo $result + if [[ $result == *'{"messages":"test Messages update"'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs chathistory-mongo-server + exit 1 + fi + + # Test delete API + result=$(curl -X 'POST' \ + http://${ip_address}:${CHATHISTORY_PORT}/v1/chathistory/delete \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "id": "'${id}'"}') + echo $result + if [[ $result == *'true'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs chathistory-mongo-server + exit 1 + fi } function stop_docker() { - cid=$(docker ps -aq --filter "name=chathistory-mongo-*") - if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + docker ps -a --filter "name=feedbackmanagement-mongo-server" --filter "name=mongodb" --format "{{.Names}}" | xargs -r docker stop } function main() { diff --git a/tests/cores/storages/test_mongodb.py b/tests/cores/storages/test_mongodb.py index 9eec040213..809334ae94 100644 --- a/tests/cores/storages/test_mongodb.py +++ b/tests/cores/storages/test_mongodb.py @@ -9,10 +9,7 @@ from comps.cores.storages import opea_store - -class DummyDoc: - def model_dump(self, **kwargs): - return {"text": "mock data"} +dummy_doc = {"text": "mock data"} class MockAsyncCursor: @@ -48,6 +45,7 @@ def setUp(self): self.config = { "MONGO_HOST": "localhost", "MONGO_PORT": 27017, + "OPEA_STORE_NAME": "mongodb", "DB_NAME": "test_db", "COLLECTION_NAME": "test_collection", "user": "test_user", @@ -81,46 +79,38 @@ def test_health_check_failure(self): async def test_asave_document(self): mock_id = ObjectId("60dbf3a1fc13ae1a3b000000") self.store.collection.insert_one.return_value.inserted_id = mock_id - result = await self.store.asave_document(DummyDoc()) + result = await self.store.asave_document(dummy_doc) self.assertEqual(result, str(mock_id)) async def test_asave_documents(self): self.store.collection.insert_many.return_value.inserted_ids = [ObjectId()] - docs = [DummyDoc()] + docs = [dummy_doc] result = await self.store.asave_documents(docs) self.assertTrue(isinstance(result, str)) async def test_aupdate_document(self): self.store.collection.update_one.return_value.modified_count = 1 - doc = {"doc_id": str(ObjectId()), "data": DummyDoc()} + doc = {"doc_id": str(ObjectId()), "data": dummy_doc} result = await self.store.aupdate_document(doc) self.assertTrue(result) async def test_aupdate_documents(self): self.store.collection.update_one.return_value.modified_count = 1 - docs = [{"doc_id": str(ObjectId()), "data": DummyDoc()}] + docs = [{"doc_id": str(ObjectId()), "data": dummy_doc}] result = await self.store.aupdate_documents(docs) self.assertTrue(result) async def test_aget_document_by_id(self): - self.store.collection.find_one.return_value = {"_id": ObjectId(), "data": {"text": "mock"}} - result = await self.store.aget_document_by_id(str(ObjectId())) - self.assertEqual(result, {"text": "mock"}) + id = ObjectId() + self.store.collection.find_one.return_value = {"_id": id, "data": {"text": "mock"}} + result = await self.store.aget_document_by_id(str(id)) + self.assertEqual(result, {"_id": id, "data": {"text": "mock"}}) async def test_aget_documents_by_ids(self): mock_id = ObjectId("60dbf3a1fc13ae1a3b000000") self.store.collection.find_one.return_value = {"_id": mock_id, "data": {"text": "mock"}} result = await self.store.aget_documents_by_ids([str(mock_id)]) - self.assertEqual(result, [{"text": "mock"}]) - - async def test_aget_documents_by_user(self): - mock_docs = [{"_id": ObjectId("60dbf3a1fc13ae1a3b000000"), "user": "test_user"}] - self.store.collection.find.return_value = MockAsyncCursor(mock_docs) - - result = await self.store.aget_documents_by_user("test_user") - - self.assertIsInstance(result, list) - self.assertEqual(result[0]["doc_id"], "60dbf3a1fc13ae1a3b000000") + self.assertEqual(result, [{"_id": mock_id, "data": {"text": "mock"}}]) async def test_adelete_document(self): self.store.collection.delete_one.return_value.deleted_count = 1 @@ -132,22 +122,6 @@ async def test_adelete_documents(self): result = await self.store.adelete_documents([str(ObjectId())]) self.assertTrue(result) - async def test_asearch(self): - self.store.collection.create_index = MagicMock() - - mock_docs = [ - {"_id": ObjectId("60dbf3a1fc13ae1a3b000000"), "data": "mock data", "user": "test_user", "score": 0.9} - ] - mock_cursor = MockSortCursor(mock_docs) - - self.store.collection.find.return_value = mock_cursor - - result = await self.store.asearch("prompt", "value") - - self.assertEqual(len(result), 1) - self.assertEqual(result[0]["user"], "test_user") - self.assertIn("score", result[0]) - if __name__ == "__main__": unittest.main() diff --git a/tests/feedback_management/test_feedback_management_mcp.sh b/tests/feedback_management/test_feedback_management_mcp.sh index e24e6e474d..e9cdcc36a0 100755 --- a/tests/feedback_management/test_feedback_management_mcp.sh +++ b/tests/feedback_management/test_feedback_management_mcp.sh @@ -9,6 +9,7 @@ ip_address=$(hostname -I | awk '{print $1}') export MONGO_HOST=${ip_address} export MONGO_PORT=27017 +export OPEA_STORE_NAME="mongodb" export DB_NAME=${DB_NAME:-"Feedback"} export COLLECTION_NAME=${COLLECTION_NAME:-"test_mcp"} diff --git a/tests/feedback_management/test_feedback_management_mongo.sh b/tests/feedback_management/test_feedback_management_mongo.sh index 7bd57ae374..13e30c5f0d 100644 --- a/tests/feedback_management/test_feedback_management_mongo.sh +++ b/tests/feedback_management/test_feedback_management_mongo.sh @@ -9,6 +9,7 @@ ip_address=$(hostname -I | awk '{print $1}') export MONGO_HOST=${ip_address} export MONGO_PORT=27017 +export OPEA_STORE_NAME="mongodb" export DB_NAME=${DB_NAME:-"Feedback"} export COLLECTION_NAME=${COLLECTION_NAME:-"test"} @@ -35,6 +36,7 @@ function start_service() { } function validate_microservice() { + # Test create API result=$(curl -X 'POST' \ http://$ip_address:${FEEDBACK_MANAGEMENT_PORT}/v1/feedback/create \ -H 'accept: application/json' \ @@ -67,19 +69,107 @@ function validate_microservice() { } }') echo $result + id="" if [[ ${#result} -eq 26 ]]; then echo "Correct result." + id="${result//\"/}" else echo "Incorrect result." docker logs feedbackmanagement-mongo-server exit 1 fi + # Test update API + result=$(curl -X 'POST' \ + http://$ip_address:${FEEDBACK_MANAGEMENT_PORT}/v1/feedback/create \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{ + "chat_id": "66445d4f71c7eff23d44f78d", + "chat_data": { + "user": "test", + "messages": [ + { + "role": "system", + "content": "You are helpful assistant" + }, + { + "role": "user", + "content": "hi", + "time": "1724915247" + }, + { + "role": "assistant", + "content": "Hi, may I help you?", + "time": "1724915249" + } + ] + }, + "feedback_data": { + "comment": "Fair and Moderate answer", + "rating": 2, + "is_thumbs_up": true + }, + "feedback_id": "'${id}'" +}') + echo $result + if [[ $result == "true" ]]; then + echo "Correct result." + else + echo "Incorrect result." + docker logs feedbackmanagement-mongo-server + exit 1 + fi + + # Test get_by_user API + result=$(curl -X 'POST' \ + http://$ip_address:${FEEDBACK_MANAGEMENT_PORT}/v1/feedback/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test"}') + echo $result + if [[ $result == '[{"chat_data":{"messages":'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs feedbackmanagement-mongo-server + exit 1 + fi + + # Test get_by_id API + result=$(curl -X 'POST' \ + http://$ip_address:${FEEDBACK_MANAGEMENT_PORT}/v1/feedback/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "feedback_id": "'${id}'"}') + echo $result + if [[ $result == '{"chat_data":{"messages":'*'"rating":2'* ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs feedbackmanagement-mongo-server + exit 1 + fi + + # Test delete API + result=$(curl -X 'POST' \ + http://$ip_address:${FEEDBACK_MANAGEMENT_PORT}/v1/feedback/delete \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "feedback_id": "'${id}'"}') + echo $result + if [[ $result == 'true' ]]; then + echo "Result correct." + else + echo "Result wrong." + docker logs feedbackmanagement-mongo-server + exit 1 + fi + } function stop_docker() { - cid=$(docker ps -aq --filter "name=feedbackmanagement-mongo-*") - if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + docker ps -a --filter "name=feedbackmanagement-mongo-server" --filter "name=mongodb" --format "{{.Names}}" | xargs -r docker stop } function main() { diff --git a/tests/prompt_registry/test_prompt_registry_mcp.sh b/tests/prompt_registry/test_prompt_registry_mcp.sh index 227bfe1838..00b0bffdf5 100755 --- a/tests/prompt_registry/test_prompt_registry_mcp.sh +++ b/tests/prompt_registry/test_prompt_registry_mcp.sh @@ -9,6 +9,7 @@ ip_address=$(hostname -I | awk '{print $1}') export MONGO_HOST=${ip_address} export MONGO_PORT=27018 +export OPEA_STORE_NAME="mongodb" export DB_NAME=${DB_NAME:-"Prompts"} export COLLECTION_NAME=${COLLECTION_NAME:-"test_mcp"} export PROMPT_REGISTRY_PORT=10602 @@ -44,6 +45,7 @@ function start_service() { -e no_proxy=$no_proxy \ -e MONGO_HOST=${MONGO_HOST} \ -e MONGO_PORT=${MONGO_PORT} \ + -e OPEA_STORE_NAME=${OPEA_STORE_NAME} \ -e DB_NAME=${DB_NAME} \ -e COLLECTION_NAME=${COLLECTION_NAME} \ -e ENABLE_MCP=${ENABLE_MCP} \ diff --git a/tests/prompt_registry/test_prompt_registry_mongo.sh b/tests/prompt_registry/test_prompt_registry_mongo.sh index b63dab81b4..f1521ccaf8 100644 --- a/tests/prompt_registry/test_prompt_registry_mongo.sh +++ b/tests/prompt_registry/test_prompt_registry_mongo.sh @@ -9,6 +9,7 @@ ip_address=$(hostname -I | awk '{print $1}') export MONGO_HOST=${ip_address} export MONGO_PORT=27017 +export OPEA_STORE_NAME="mongodb" export DB_NAME=${DB_NAME:-"Prompts"} export COLLECTION_NAME=${COLLECTION_NAME:-"test"} @@ -35,6 +36,7 @@ function start_service() { } function validate_microservice() { + # Test create API result=$(curl -X 'POST' \ http://$ip_address:${PROMPT_REGISTRY_PORT}/v1/prompt/create \ -H 'accept: application/json' \ @@ -43,19 +45,79 @@ function validate_microservice() { "prompt_text": "test prompt", "user": "test" }') echo $result + id="" if [[ ${#result} -eq 26 ]]; then echo "Correct result." + id="${result//\"/}" else echo "Incorrect result." docker logs promptregistry-mongo-server exit 1 fi + # Test get_by_id API + result=$(curl -X 'POST' \ + http://$ip_address:${PROMPT_REGISTRY_PORT}/v1/prompt/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "prompt_id": "'${id}'"}') + echo $result + if [[ "${result//\"/}" == "test prompt" ]]; then + echo "Correct result." + else + echo "Incorrect result." + docker logs promptregistry-mongo-server + exit 1 + fi + + # Test get_by_user API + result=$(curl -X 'POST' \ + http://$ip_address:${PROMPT_REGISTRY_PORT}/v1/prompt/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test"}') + echo $result + if [[ $result == '[{"prompt_text":"'* ]]; then + echo "Correct result." + else + echo "Incorrect result." + docker logs promptregistry-mongo-server + exit 1 + fi + + # Test search API + result=$(curl -X 'POST' \ + http://$ip_address:${PROMPT_REGISTRY_PORT}/v1/prompt/get \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "prompt_text": "test prompt"}') + echo $result + if [[ $result == '[{"prompt_text":"'* ]]; then + echo "Correct result." + else + echo "Incorrect result." + docker logs promptregistry-mongo-server + exit 1 + fi + + # Test delete API + result=$(curl -X 'POST' \ + http://$ip_address:${PROMPT_REGISTRY_PORT}/v1/prompt/delete \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"user": "test", "prompt_id": "'${id}'"}') + echo $result + if [[ $result == "true" ]]; then + echo "Correct result." + else + echo "Incorrect result." + docker logs promptregistry-mongo-server + exit 1 + fi } function stop_docker() { - cid=$(docker ps -aq --filter "name=promptregistry-mongo-*") - if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + docker ps -a --filter "name=feedbackmanagement-mongo-server" --filter "name=mongodb" --format "{{.Names}}" | xargs -r docker stop } function main() {