diff --git a/comps/cores/proto/api_protocol.py b/comps/cores/proto/api_protocol.py index 838a8c8fca..f8f14e0dca 100644 --- a/comps/cores/proto/api_protocol.py +++ b/comps/cores/proto/api_protocol.py @@ -6,7 +6,7 @@ from typing import Any, Dict, List, Literal, Optional, Union import shortuuid -from fastapi import File, UploadFile +from fastapi import File, Form, UploadFile from fastapi.responses import JSONResponse from pydantic import BaseModel, Field @@ -80,6 +80,70 @@ class TokenCheckResponse(BaseModel): prompts: List[TokenCheckResponseItem] +class DataprepRequest: + def __init__( + self, + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + link_list: Optional[str] = Form(None), + chunk_size: Optional[int] = Form(1500), + chunk_overlap: Optional[int] = Form(100), + process_table: Optional[bool] = Form(False), + table_strategy: Optional[str] = Form("fast"), + ): + self.files = files + self.link_list = link_list + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.process_table = process_table + self.table_strategy = table_strategy + + +class Neo4jDataprepRequest(DataprepRequest): + def __init__( + self, + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + link_list: Optional[str] = Form(None), + chunk_size: Optional[int] = Form(1500), + chunk_overlap: Optional[int] = Form(100), + process_table: Optional[bool] = Form(False), + table_strategy: Optional[str] = Form("fast"), + ingest_from_graphDB: bool = Form(False), + ): + super().__init__( + files=files, + link_list=link_list, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + process_table=process_table, + table_strategy=table_strategy, + ) + + self.ingest_from_graphDB = ingest_from_graphDB + + +class RedisDataprepRequest(DataprepRequest): + def __init__( + self, + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + link_list: Optional[str] = Form(None), + chunk_size: Optional[int] = Form(1500), + chunk_overlap: Optional[int] = Form(100), + process_table: Optional[bool] = Form(False), + table_strategy: Optional[str] = Form("fast"), + index_name: str = Form(None), + ): + super().__init__( + files=files, + link_list=link_list, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + process_table=process_table, + table_strategy=table_strategy, + ) + + self.index_name = index_name + + class EmbeddingRequest(BaseModel): # Ordered by official OpenAI API documentation # https://platform.openai.com/docs/api-reference/embeddings diff --git a/comps/dataprep/src/integrations/elasticsearch.py b/comps/dataprep/src/integrations/elasticsearch.py index aaa1c9f1e2..fb9d07e8de 100644 --- a/comps/dataprep/src/integrations/elasticsearch.py +++ b/comps/dataprep/src/integrations/elasticsearch.py @@ -15,6 +15,7 @@ from langchain_huggingface import HuggingFaceEmbeddings from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -237,28 +238,27 @@ async def ingest_link_to_elastic(self, link_list: List[str]) -> None: if logflag: logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}") - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - ): + async def ingest_files(self, input: DataprepRequest): """Ingest files/links content into ElasticSearch database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"files:{files}") logger.info(f"link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/milvus.py b/comps/dataprep/src/integrations/milvus.py index f5957d320b..a78576a4f1 100644 --- a/comps/dataprep/src/integrations/milvus.py +++ b/comps/dataprep/src/integrations/milvus.py @@ -17,6 +17,7 @@ from langchain_text_splitters import HTMLHeaderTextSplitter from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -244,28 +245,27 @@ def check_health(self) -> bool: def invoke(self, *args, **kwargs): pass - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - ): + async def ingest_files(self, input: DataprepRequest): """Ingest files/links content into milvus database. Save in the format of vector[], the vector length depends on the emedding model type. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"[ milvus ingest ] files:{files}") logger.info(f"[ milvus ingest ] link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/neo4j_langchain.py b/comps/dataprep/src/integrations/neo4j_langchain.py index c51dac7996..9f3b460c3c 100644 --- a/comps/dataprep/src/integrations/neo4j_langchain.py +++ b/comps/dataprep/src/integrations/neo4j_langchain.py @@ -16,6 +16,7 @@ from langchain_text_splitters import HTMLHeaderTextSplitter from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( document_loader, encode_filename, @@ -137,28 +138,27 @@ async def ingest_data_to_neo4j(self, doc_path: DocPath): return True - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - ): + async def ingest_files(self, input: DataprepRequest): """Ingest files/links content into Neo4j database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"files:{files}") logger.info(f"link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/neo4j_llamaindex.py b/comps/dataprep/src/integrations/neo4j_llamaindex.py index 68b7539038..a12c930597 100644 --- a/comps/dataprep/src/integrations/neo4j_llamaindex.py +++ b/comps/dataprep/src/integrations/neo4j_llamaindex.py @@ -36,6 +36,7 @@ from transformers import AutoTokenizer from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest, Neo4jDataprepRequest from comps.dataprep.src.utils import ( document_loader, encode_filename, @@ -664,29 +665,32 @@ async def build_communities(self, index: PropertyGraphIndex): logger.error(f"Error building communities: {e}\n{error_trace}") return False - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - ): + async def ingest_files(self, input: Union[DataprepRequest, Neo4jDataprepRequest]): """Ingest files/links content into Neo4j database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). - ingest_from_graphDB (bool, optional): Whether to skip generating graph from files and instead loading index from existing graph store. + input (Neo4jDataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + ingest_from_graphDB (bool, optional): Whether to skip generating graph from files and instead loading index from existing graph store. """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + + ingest_from_graphDB = False + if isinstance(input, Neo4jDataprepRequest): + ingest_from_graphDB = input.ingest_from_graphDB + if logflag: logger.info(f"files:{files}") logger.info(f"link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/opensearch.py b/comps/dataprep/src/integrations/opensearch.py index b06ced2877..2b51a5001c 100644 --- a/comps/dataprep/src/integrations/opensearch.py +++ b/comps/dataprep/src/integrations/opensearch.py @@ -14,6 +14,7 @@ from opensearchpy import OpenSearch from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -322,26 +323,28 @@ def search_all_documents(self, index_name, offset, search_batch_size): async def ingest_files( self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), + input: DataprepRequest, ): """Ingest files/links content into opensearch database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"[ upload ] files:{files}") logger.info(f"[ upload ] link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/pgvect.py b/comps/dataprep/src/integrations/pgvect.py index 61965427a8..e1aae31375 100644 --- a/comps/dataprep/src/integrations/pgvect.py +++ b/comps/dataprep/src/integrations/pgvect.py @@ -15,6 +15,7 @@ from langchain_huggingface import HuggingFaceEmbeddings from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -245,26 +246,24 @@ async def ingest_link_to_pgvector(self, link_list: List[str]): async def ingest_files( self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), + input: DataprepRequest, ): """Ingest files/links content into pgvector database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + if logflag: logger.info(f"files:{files}") logger.info(f"link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/pipecone.py b/comps/dataprep/src/integrations/pipecone.py index dcdd16e95e..b142adbc81 100644 --- a/comps/dataprep/src/integrations/pipecone.py +++ b/comps/dataprep/src/integrations/pipecone.py @@ -15,6 +15,7 @@ from pinecone import Pinecone, ServerlessSpec from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -229,28 +230,27 @@ async def ingest_link_to_pinecone(self, link_list: List[str], chunk_size, chunk_ return True - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - ): + async def ingest_files(self, input: DataprepRequest): """Ingest files/links content into pipecone database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"files:{files}") logger.info(f"link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/qdrant.py b/comps/dataprep/src/integrations/qdrant.py index af7c572649..386b718b89 100644 --- a/comps/dataprep/src/integrations/qdrant.py +++ b/comps/dataprep/src/integrations/qdrant.py @@ -14,6 +14,7 @@ from qdrant_client import QdrantClient from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( document_loader, encode_filename, @@ -153,26 +154,28 @@ async def ingest_data_to_qdrant(self, doc_path: DocPath): async def ingest_files( self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), + input: DataprepRequest, ): """Ingest files/links content into qdrant database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"files:{files}") logger.info(f"link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index 91c58e1188..8e1a498a52 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -21,6 +21,7 @@ from redis.commands.search.indexDefinition import IndexDefinition, IndexType from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest, RedisDataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -361,30 +362,32 @@ async def check_health(self) -> bool: def invoke(self, *args, **kwargs): pass - async def ingest_files( - self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - index_name: str = Form(None), - ): + async def ingest_files(self, input: Union[DataprepRequest, RedisDataprepRequest]): """Ingest files/links content into redis database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). - index_name (str, optional): The name of the index where data will be ingested. + input (RedisDataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + index_name (str, optional): The name of the index where data will be ingested. """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + + index_name = None + if isinstance(input, RedisDataprepRequest): + index_name = input.index_name + if logflag: logger.info(f"[ redis ingest ] files:{files}") logger.info(f"[ redis ingest ] link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/redis_finance.py b/comps/dataprep/src/integrations/redis_finance.py index d3fc50c9b1..b4770531ff 100644 --- a/comps/dataprep/src/integrations/redis_finance.py +++ b/comps/dataprep/src/integrations/redis_finance.py @@ -13,6 +13,7 @@ from langchain_community.vectorstores import Redis from comps import OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.integrations.utils.redis_finance_utils import * from comps.dataprep.src.integrations.utils.redis_kv import RedisKVStore from comps.dataprep.src.utils import encode_filename, save_content_to_local_disk @@ -222,26 +223,24 @@ def invoke(self, *args, **kwargs): async def ingest_files( self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), + input: DataprepRequest, ): """Ingest files/links content into redis database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + if logflag: logger.info(f"[ redis ingest ] files:{files}") logger.info(f"[ redis ingest ] link_list:{link_list}") diff --git a/comps/dataprep/src/integrations/vdms.py b/comps/dataprep/src/integrations/vdms.py index 5f389438af..fc530bec97 100644 --- a/comps/dataprep/src/integrations/vdms.py +++ b/comps/dataprep/src/integrations/vdms.py @@ -13,6 +13,7 @@ from langchain_vdms.vectorstores import VDMS, VDMS_Client from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType +from comps.cores.proto.api_protocol import DataprepRequest from comps.dataprep.src.utils import ( create_upload_folder, document_loader, @@ -146,26 +147,28 @@ async def ingest_data_to_vdms(self, doc_path: DocPath): async def ingest_files( self, - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), + input: DataprepRequest, ): """Ingest files/links content into VDMS database. Save in the format of vector[768]. Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful. Args: - files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). - link_list (str, optional): A list of links to be ingested. Defaults to Form(None). - chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). - chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). - process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). - table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + input (DataprepRequest): Model containing the following parameters: + files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None). + link_list (str, optional): A list of links to be ingested. Defaults to Form(None). + chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500). + chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). + process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). + table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). """ + files = input.files + link_list = input.link_list + chunk_size = input.chunk_size + chunk_overlap = input.chunk_overlap + process_table = input.process_table + table_strategy = input.table_strategy + if logflag: logger.info(f"[ upload ] files:{files}") logger.info(f"[ upload ] link_list:{link_list}") diff --git a/comps/dataprep/src/opea_dataprep_microservice.py b/comps/dataprep/src/opea_dataprep_microservice.py index 397cd19cd9..adce6b4ad7 100644 --- a/comps/dataprep/src/opea_dataprep_microservice.py +++ b/comps/dataprep/src/opea_dataprep_microservice.py @@ -4,9 +4,9 @@ import os import time -from typing import List, Optional, Union +from typing import Annotated, List, Optional, Union -from fastapi import Body, File, Form, UploadFile +from fastapi import Body, Depends, File, Form, HTTPException, Request, UploadFile from integrations.elasticsearch import OpeaElasticSearchDataprep from integrations.milvus import OpeaMilvusDataprep from integrations.neo4j_llamaindex import OpeaNeo4jLlamaIndexDataprep @@ -27,6 +27,7 @@ register_statistics, statistics_dict, ) +from comps.cores.proto.api_protocol import DataprepRequest, Neo4jDataprepRequest, RedisDataprepRequest from comps.dataprep.src.utils import create_upload_folder logger = CustomLogger("opea_dataprep_microservice") @@ -41,6 +42,33 @@ ) +async def resolve_dataprep_request(request: Request): + form = await request.form() + + common_args = { + "files": form.get("files", None), + "link_list": form.get("link_list", None), + "chunk_size": form.get("chunk_size", 1500), + "chunk_overlap": form.get("chunk_overlap", 100), + "process_table": form.get("process_table", False), + "table_strategy": form.get("table_strategy", "fast"), + } + + if "index_name" in form: + return RedisDataprepRequest( + **common_args, + index_name=form.get("index_name"), + ) + + if "ingest_from_graphDB" in form: + return Neo4jDataprepRequest( + **common_args, + ingest_from_graphDB=form.get("ingest_from_graphDB"), + ) + + return DataprepRequest(**common_args) + + @register_microservice( name="opea_service@dataprep", service_type=ServiceType.DATAPREP, @@ -50,44 +78,26 @@ ) @register_statistics(names=["opea_service@dataprep"]) async def ingest_files( - files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), - link_list: Optional[str] = Form(None), - chunk_size: int = Form(1500), - chunk_overlap: int = Form(100), - process_table: bool = Form(False), - table_strategy: str = Form("fast"), - ingest_from_graphDB: bool = Form(False), - index_name: Optional[str] = Form(None), + input: Union[DataprepRequest, RedisDataprepRequest, Neo4jDataprepRequest] = Depends(resolve_dataprep_request), ): + if isinstance(input, RedisDataprepRequest): + logger.info(f"[ ingest ] Redis mode: index_name={input.index_name}") + elif isinstance(input, Neo4jDataprepRequest): + logger.info(f"[ ingest ] Neo4j mode: ingest_from_graphDB={input.ingest_from_graphDB}") + else: + logger.info("[ ingest ] Base mode") + start = time.time() + files = input.files + link_list = input.link_list + if logflag: logger.info(f"[ ingest ] files:{files}") logger.info(f"[ ingest ] link_list:{link_list}") try: - # Use the loader to invoke the component - if dataprep_component_name == "OPEA_DATAPREP_REDIS": - response = await loader.ingest_files( - files, - link_list, - chunk_size, - chunk_overlap, - process_table, - table_strategy, - ingest_from_graphDB, - index_name, - ) - else: - if index_name: - logger.error( - 'Error during dataprep ingest invocation: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"' - ) - raise - - response = await loader.ingest_files( - files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB - ) + response = await loader.ingest_files(input) # Log the result if logging is enabled if logflag: diff --git a/tests/dataprep/dataprep_utils.sh b/tests/dataprep/dataprep_utils.sh index ec5e2a7893..c3d86e1feb 100644 --- a/tests/dataprep/dataprep_utils.sh +++ b/tests/dataprep/dataprep_utils.sh @@ -39,61 +39,67 @@ function _invoke_curl() { RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') } -# validate_ingest +# +function _add_db_params() { + local db=$1 + if [[ "$db" == "redis" ]]; then + echo '-F index_name=rag_redis' + fi +} + +function ingest_file() { + local fqdn="$1" + local port="$2" + local db_or_filename="$3" + local filename="$4" + + if [[ "$filename" == "" ]]; then + filename="$db_or_filename" + db="" + shift 3 + else + db="$db_or_filename" + shift 4 + fi + + local extra_args=$(_add_db_params "$db") + _invoke_curl "$fqdn" "$port" ingest -F "files=@${SCRIPT_DIR}/${filename}" $extra_args "$@" +} + function ingest_doc() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.doc" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.doc" "${@:5}" } function ingest_docx() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.docx" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.docx" "${@:5}" } function ingest_pdf() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.pdf" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.pdf" "${@:5}" } function ingest_ppt() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.ppt" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.ppt" "${@:5}" } function ingest_pptx() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.pptx" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.pptx" "${@:5}" } function ingest_txt() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.txt" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.txt" "${@:5}" } function ingest_xlsx() { - local fqdn=$1 - local port=$2 - shift 2 - _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.xlsx" $@ + ingest_file "$1" "$2" "$3" "ingest_dataprep.xlsx" "${@:5}" } function ingest_external_link() { local fqdn=$1 local port=$2 shift 2 - _invoke_curl $fqdn $port ingest -F 'link_list=["https://www.ces.tech/"]' $@ + local extra_args=$(_add_db_params "$db") + _invoke_curl $fqdn $port ingest -F 'link_list=["https://www.ces.tech/"]' $extra_args $@ } function delete_all() { @@ -123,7 +129,7 @@ function delete_item_in_index() { function delete_single() { local fqdn=$1 local port=$2 - shift 3 + shift 2 _invoke_curl $fqdn $port delete -d '{"file_path":"ingest_dataprep.txt"}' $@ } diff --git a/tests/dataprep/test_dataprep_milvus.sh b/tests/dataprep/test_dataprep_milvus.sh index 809506e766..958e44ddf6 100644 --- a/tests/dataprep/test_dataprep_milvus.sh +++ b/tests/dataprep/test_dataprep_milvus.sh @@ -42,6 +42,10 @@ function start_service() { } function validate_microservice() { + # test /v1/dataprep/delete + delete_all ${ip_address} ${DATAPREP_PORT} + check_result "dataprep - del" '{"status":true}' dataprep-milvus-server ${LOG_PATH}/dataprep_milvus.log + # test /v1/dataprep/ingest upload file ingest_doc ${ip_address} ${DATAPREP_PORT} check_result "dataprep - upload - doc" "Data preparation succeeded" dataprep-milvus-server ${LOG_PATH}/dataprep_milvus.log diff --git a/tests/dataprep/test_dataprep_redis.sh b/tests/dataprep/test_dataprep_redis.sh index 6d9f6dd89e..e07961d7eb 100644 --- a/tests/dataprep/test_dataprep_redis.sh +++ b/tests/dataprep/test_dataprep_redis.sh @@ -52,25 +52,25 @@ function validate_microservice() { check_result "dataprep - del" '{"status":true}' dataprep-redis-server ${LOG_PATH}/dataprep_del.log # test /v1/dataprep/ingest upload file - ingest_doc ${ip_address} ${DATAPREP_PORT} + ingest_doc ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - doc" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - ingest_docx ${ip_address} ${DATAPREP_PORT} + ingest_docx ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - docx" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - ingest_pdf ${ip_address} ${DATAPREP_PORT} + ingest_pdf ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - pdf" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - ingest_ppt ${ip_address} ${DATAPREP_PORT} + ingest_ppt ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - ppt" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - ingest_pptx ${ip_address} ${DATAPREP_PORT} + ingest_pptx ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - pptx" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - ingest_txt ${ip_address} ${DATAPREP_PORT} + ingest_txt ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - txt" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - ingest_xlsx ${ip_address} ${DATAPREP_PORT} + ingest_xlsx ${ip_address} ${DATAPREP_PORT} "redis" check_result "dataprep - upload - xlsx" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log # test /v1/dataprep/ingest upload link