diff --git a/comps/cores/proto/api_protocol.py b/comps/cores/proto/api_protocol.py index 14d5bea6c1..dc86746c19 100644 --- a/comps/cores/proto/api_protocol.py +++ b/comps/cores/proto/api_protocol.py @@ -262,6 +262,7 @@ class ChatCompletionRequest(BaseModel): lambda_mult: float = 0.5 score_threshold: float = 0.2 retrieved_docs: Union[List[RetrievalResponseData], List[Dict[str, Any]]] = Field(default_factory=list) + index_name: Optional[str] = None # reranking top_n: int = 1 diff --git a/comps/cores/proto/docarray.py b/comps/cores/proto/docarray.py index 64b0b19fd2..335212b689 100644 --- a/comps/cores/proto/docarray.py +++ b/comps/cores/proto/docarray.py @@ -102,6 +102,7 @@ class EmbedDoc(BaseDoc): lambda_mult: float = 0.5 score_threshold: float = 0.2 constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None + index_name: Optional[str] = None class EmbedMultimodalDoc(EmbedDoc): @@ -225,6 +226,7 @@ class LLMParams(BaseDoc): repetition_penalty: float = 1.03 stream: bool = True language: str = "auto" # can be "en", "zh" + index_name: Optional[str] = None chat_template: Optional[str] = Field( default=None, diff --git a/comps/dataprep/src/integrations/redis.py b/comps/dataprep/src/integrations/redis.py index d67931d95c..07485ebcef 100644 --- a/comps/dataprep/src/integrations/redis.py +++ b/comps/dataprep/src/integrations/redis.py @@ -187,7 +187,7 @@ async def delete_by_id(client, id): return True -async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): +async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder, index_name: str): if logflag: logger.info(f"[ redis ingest chunks ] file name: {file_name}") @@ -195,6 +195,9 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): batch_size = 32 num_chunks = len(chunks) + # if data will be saved to a different index name than the default one + ingest_index_name = index_name if index_name else INDEX_NAME + file_ids = [] for i in range(0, num_chunks, batch_size): if logflag: @@ -206,7 +209,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): Redis.from_texts_return_keys, texts=batch_texts, embedding=embedder, - index_name=INDEX_NAME, + index_name=ingest_index_name, redis_url=REDIS_URL, ) if logflag: @@ -222,7 +225,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): await create_index(client) try: - await store_by_id(client, key=file_name, value="#".join(file_ids)) + await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids)) except Exception as e: if logflag: logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.") @@ -230,7 +233,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder): return True -async def ingest_data_to_redis(doc_path: DocPath, embedder): +async def ingest_data_to_redis(doc_path: DocPath, embedder, index_name): """Ingest document to Redis.""" path = doc_path.path if logflag: @@ -271,7 +274,7 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder): logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.") file_name = doc_path.path.split("/")[-1] - return await ingest_chunks_to_redis(file_name, chunks, embedder) + return await ingest_chunks_to_redis(file_name, chunks, embedder, index_name) @OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS") @@ -360,6 +363,7 @@ async def ingest_files( process_table: bool = Form(False), table_strategy: str = Form("fast"), ingest_from_graphDB: bool = Form(False), + index_name: str = Form(None), ): """Ingest files/links content into redis database. @@ -372,6 +376,7 @@ async def ingest_files( chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100). process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False). table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast"). + index_name (str, optional): The name of the index where data will be ingested. """ if logflag: logger.info(f"[ redis ingest ] files:{files}") @@ -384,7 +389,9 @@ async def ingest_files( for file in files: encode_file = encode_filename(file.filename) - doc_id = "file:" + encode_file + index_name_id = encode_filename(INDEX_NAME if index_name is None else index_name) + doc_id = "file:" + index_name_id + "_" + encode_file + if logflag: logger.info(f"[ redis ingest ] processing file {doc_id}") @@ -400,7 +407,7 @@ async def ingest_files( if key_ids: raise HTTPException( status_code=400, - detail=f"Uploaded file {file.filename} already exists. Please change file name.", + detail=f"Uploaded file {file.filename} already exists. Please change file name or index name.", ) save_path = upload_folder + encode_file @@ -414,6 +421,7 @@ async def ingest_files( table_strategy=table_strategy, ), self.embedder, + index_name, ) uploaded_files.append(save_path) if logflag: @@ -430,7 +438,8 @@ async def ingest_files( raise HTTPException(status_code=400, detail=f"Link_list {link_list} should be a list.") for link in link_list: encoded_link = encode_filename(link) - doc_id = "file:" + encoded_link + ".txt" + index_name_id = encode_filename(INDEX_NAME if index_name is None else index_name) + doc_id = "file:" + index_name_id + "_" + encoded_link + ".txt" if logflag: logger.info(f"[ redis ingest] processing link {doc_id}") @@ -445,7 +454,8 @@ async def ingest_files( logger.info(f"[ redis ingest] Link {link} does not exist. Keep storing.") if key_ids: raise HTTPException( - status_code=400, detail=f"Uploaded link {link} already exists. Please change another link." + status_code=400, + detail=f"Uploaded link {link} already exists. Please change another link or index_name.", ) save_path = upload_folder + encoded_link + ".txt" @@ -460,6 +470,7 @@ async def ingest_files( table_strategy=table_strategy, ), self.embedder, + index_name, ) if logflag: logger.info(f"[ redis ingest] Successfully saved link list {link_list}") @@ -505,7 +516,7 @@ async def get_files(self): logger.info(f"[get] final file_list: {file_list}") return file_list - async def delete_files(self, file_path: str = Body(..., embed=True)): + async def delete_files(self, file_path: str = Body(..., embed=True), index_name: str = Body(None, embed=True)): """Delete file according to `file_path`. `file_path`: @@ -531,17 +542,19 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): else: logger.info(f"[ redis delete ] Index {KEY_INDEX_NAME} does not exits.") - # drop index INDEX_NAME - if await check_index_existance(self.data_index_client): - try: - assert drop_index(index_name=INDEX_NAME) - except Exception as e: - if logflag: - logger.info(f"[ redis delete ] {e}. Fail to drop index {INDEX_NAME}.") - raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.") + if len(self.get_list_of_indices()) > 0: + for i in self.get_list_of_indices(): + try: + # drop index INDEX_NAME + assert drop_index(index_name=i) + logger.info(f"[ redis delete ] Index_name: {i} is deleted.") + except Exception as e: + if logflag: + logger.info(f"[ redis delete ] {e}. Fail to drop index {i}.") + raise HTTPException(status_code=500, detail=f"Fail to drop index {i}.") else: if logflag: - logger.info(f"[ redis delete ] Index {INDEX_NAME} does not exits.") + logger.info("[ redis delete ] There is no index_name registered to redis db.") # delete files on local disk try: @@ -563,7 +576,10 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): logger.info(f"[ redis delete ] delete_path: {delete_path}") # partially delete files - doc_id = "file:" + encode_filename(file_path) + encode_file = encode_filename(file_path) + index_name = INDEX_NAME if index_name is None else index_name + index_name_id = encode_filename(index_name) + doc_id = "file:" + index_name_id + "_" + encode_file logger.info(f"[ redis delete ] doc id: {doc_id}") # determine whether this file exists in db KEY_INDEX_NAME @@ -587,16 +603,16 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.") raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.") - # delete file content in db INDEX_NAME + # delete file content in db index_name for file_id in file_ids: - # determine whether this file exists in db INDEX_NAME + # determine whether this file exists in db index_name try: await search_by_id(self.data_index_client, file_id) except Exception as e: if logflag: logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.") raise HTTPException( - status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path." + status_code=404, detail=f"File not found in db {index_name}. Please check file_path." ) # delete file content @@ -605,7 +621,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): assert res except Exception as e: if logflag: - logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {INDEX_NAME}") + logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}") raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.") # local file does not exist (restarted docker container) @@ -627,3 +643,15 @@ async def delete_files(self, file_path: str = Body(..., embed=True)): if logflag: logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.") raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") + + def get_list_of_indices(self): + """Retrieves a list of all indices from the Redis client. + + Returns: + A list of index names as strings. + """ + # Execute the command to list all indices + indices = self.client.execute_command("FT._LIST") + # Decode each index name from bytes to string + indices_list = [item.decode("utf-8") for item in indices] + return indices_list diff --git a/comps/dataprep/src/opea_dataprep_loader.py b/comps/dataprep/src/opea_dataprep_loader.py index 8ec1042f8d..be4a15d9d7 100644 --- a/comps/dataprep/src/opea_dataprep_loader.py +++ b/comps/dataprep/src/opea_dataprep_loader.py @@ -32,6 +32,11 @@ async def delete_files(self, *args, **kwargs): logger.info("[ dataprep loader ] delete files") return await self.component.delete_files(*args, **kwargs) + async def get_list_of_indices(self, *args, **kwargs): + if logflag: + logger.info("[ dataprep loader ] get indices") + return self.component.get_list_of_indices(*args, **kwargs) + class OpeaDataprepMultiModalLoader(OpeaComponentLoader): def __init__(self, component_name, **kwargs): diff --git a/comps/dataprep/src/opea_dataprep_microservice.py b/comps/dataprep/src/opea_dataprep_microservice.py index ff0f5d8335..ac5c5443ad 100644 --- a/comps/dataprep/src/opea_dataprep_microservice.py +++ b/comps/dataprep/src/opea_dataprep_microservice.py @@ -57,6 +57,7 @@ async def ingest_files( process_table: bool = Form(False), table_strategy: str = Form("fast"), ingest_from_graphDB: bool = Form(False), + index_name: Optional[str] = Form(None), ): start = time.time() @@ -66,9 +67,28 @@ async def ingest_files( try: # Use the loader to invoke the component - response = await loader.ingest_files( - files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB - ) + if dataprep_component_name == "OPEA_DATAPREP_REDIS": + response = await loader.ingest_files( + files, + link_list, + chunk_size, + chunk_overlap, + process_table, + table_strategy, + ingest_from_graphDB, + index_name, + ) + else: + if index_name: + logger.error( + 'Error during dataprep ingest invocation: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"' + ) + raise + + response = await loader.ingest_files( + files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB + ) + # Log the result if logging is enabled if logflag: logger.info(f"[ ingest ] Output generated: {response}") @@ -116,7 +136,7 @@ async def get_files(): port=5000, ) @register_statistics(names=["opea_service@dataprep"]) -async def delete_files(file_path: str = Body(..., embed=True)): +async def delete_files(file_path: str = Body(..., embed=True), index_name: str = Body(None, embed=True)): start = time.time() if logflag: @@ -124,7 +144,17 @@ async def delete_files(file_path: str = Body(..., embed=True)): try: # Use the loader to invoke the component - response = await loader.delete_files(file_path) + if dataprep_component_name == "OPEA_DATAPREP_REDIS": + response = await loader.delete_files(file_path, index_name) + else: + if index_name: + logger.error( + 'Error during dataprep delete files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"' + ) + raise + # Use the loader to invoke the component + response = await loader.delete_files(file_path) + # Log the result if logging is enabled if logflag: logger.info(f"[ delete ] deleted result: {response}") @@ -136,6 +166,42 @@ async def delete_files(file_path: str = Body(..., embed=True)): raise +@register_microservice( + name="opea_service@dataprep", + service_type=ServiceType.DATAPREP, + endpoint="/v1/dataprep/indices", + host="0.0.0.0", + port=5000, +) +@register_statistics(names=["opea_service@dataprep"]) +async def get_list_of_indices(): + start = time.time() + if logflag: + logger.info("[ get ] start to get list of indices.") + + if dataprep_component_name != "OPEA_DATAPREP_REDIS": + logger.error( + 'Error during dataprep - get list of indices: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"' + ) + raise + + try: + # Use the loader to invoke the component + response = await loader.get_list_of_indices() + + # Log the result if logging is enabled + if logflag: + logger.info(f"[ get ] list of indices: {response}") + + # Record statistics + statistics_dict["opea_service@dataprep"].append_latency(time.time() - start, None) + + return response + except Exception as e: + logger.error(f"Error during dataprep get list of indices: {e}") + raise + + if __name__ == "__main__": logger.info("OPEA Dataprep Microservice is starting...") create_upload_folder(upload_folder) diff --git a/comps/retrievers/src/integrations/redis.py b/comps/retrievers/src/integrations/redis.py index 7d208e8488..86dce78338 100644 --- a/comps/retrievers/src/integrations/redis.py +++ b/comps/retrievers/src/integrations/redis.py @@ -92,16 +92,16 @@ async def _initialize_embedder(self): embedder = HuggingFaceEmbeddings(model_name=EMBED_MODEL) return embedder - async def _initialize_client(self) -> Redis: + async def _initialize_client(self, index_name=INDEX_NAME) -> Redis: """Initializes the redis client.""" try: if BRIDGE_TOWER_EMBEDDING: logger.info(f"generate multimodal redis instance with {BRIDGE_TOWER_EMBEDDING}") client = Redis( - embedding=self.embeddings, index_name=INDEX_NAME, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL + embedding=self.embeddings, index_name=index_name, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL ) else: - client = Redis(embedding=self.embeddings, index_name=INDEX_NAME, redis_url=REDIS_URL) + client = Redis(embedding=self.embeddings, index_name=index_name, redis_url=REDIS_URL) return client except Exception as e: logger.error(f"fail to initialize redis client: {e}") @@ -138,9 +138,14 @@ async def invoke( if logflag: logger.info(input) + client = self.client + if isinstance(input, EmbedDoc) and input.index_name and input.index_name != INDEX_NAME: + client = asyncio.run(self._initialize_client(index_name=input.index_name)) + # check if the Redis index has data try: - keys_exist = self.client.client.keys() + keys_exist = client.client.keys() + except Exception as e: logger.error(f"Redis key check failed: {e}") keys_exist = [] @@ -163,7 +168,7 @@ async def invoke( # if the Redis index has data, perform the search if input.search_type == "similarity": search_res = await run_in_thread( - self.client.similarity_search_by_vector, embedding=embedding_data_input, k=input.k + client.similarity_search_by_vector, embedding=embedding_data_input, k=input.k ) elif input.search_type == "similarity_distance_threshold": if input.distance_threshold is None: @@ -171,14 +176,14 @@ async def invoke( "distance_threshold must be provided for " + "similarity_distance_threshold retriever" ) search_res = await run_in_thread( - self.client.similarity_search_by_vector, + client.similarity_search_by_vector, embedding=input.embedding, k=input.k, distance_threshold=input.distance_threshold, ) elif input.search_type == "similarity_score_threshold": docs_and_similarities = await run_in_thread( - self.client.similarity_search_with_relevance_scores, + client.similarity_search_with_relevance_scores, query=input.text, k=input.k, score_threshold=input.score_threshold, @@ -186,7 +191,7 @@ async def invoke( search_res = [doc for doc, _ in docs_and_similarities] elif input.search_type == "mmr": search_res = await run_in_thread( - self.client.max_marginal_relevance_search, + client.max_marginal_relevance_search, query=input.text, k=input.k, fetch_k=input.fetch_k, diff --git a/tests/dataprep/dataprep_utils.sh b/tests/dataprep/dataprep_utils.sh index 404de6c34b..6416c37c65 100644 --- a/tests/dataprep/dataprep_utils.sh +++ b/tests/dataprep/dataprep_utils.sh @@ -25,7 +25,7 @@ function _invoke_curl() { ingest) header='Content-Type: multipart/form-data' ;; - delete|get) + delete|get|indices) header='Content-Type: application/json' ;; *) @@ -117,6 +117,21 @@ function get_all() { _invoke_curl $fqdn $port get $@ } +function ingest_txt_with_index_name() { + local fqdn=$1 + local port=$2 + local index_name=$3 + shift 3 + _invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.txt" -F "index_name=${index_name}" $@ +} + +function indices() { + local fqdn=$1 + local port=$2 + shift 2 + _invoke_curl $fqdn $port indices $@ +} + function check_result() { local service_name=$1 local expected_response=$2 diff --git a/tests/dataprep/test_dataprep_redis.sh b/tests/dataprep/test_dataprep_redis.sh index 34c492a002..a17ee6e412 100644 --- a/tests/dataprep/test_dataprep_redis.sh +++ b/tests/dataprep/test_dataprep_redis.sh @@ -72,10 +72,17 @@ function validate_microservice() { ingest_xlsx ${ip_address} ${DATAPREP_PORT} check_result "dataprep - upload - xlsx" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log - # test /v1/dataprep/ingest upload link + # test /v1/dataprep/ingest upload link ingest_external_link ${ip_address} ${DATAPREP_PORT} check_result "dataprep - upload - link" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log + ingest_txt_with_index_name ${ip_address} ${DATAPREP_PORT} rag_redis_test + check_result "dataprep - upload with index - txt" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log + + # test /v1/dataprep/indices + indices ${ip_address} ${DATAPREP_PORT} + check_result "dataprep - indices" "['rag_redis_test']" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log + # test /v1/dataprep/get get_all ${ip_address} ${DATAPREP_PORT} check_result "dataprep - get" '{"name":' dataprep-redis-server ${LOG_PATH}/dataprep_file.log