Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3424317
CodeGen initial
MSCetin37 Mar 20, 2025
682a90f
code update
MSCetin37 Mar 20, 2025
b4f64c4
code update
MSCetin37 Mar 22, 2025
d3a7d4a
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 22, 2025
e4feb18
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 22, 2025
82542fc
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 22, 2025
6eaeb32
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 24, 2025
07b3b83
adding index_name variable
MSCetin37 Mar 24, 2025
925f4a2
update unit tests
MSCetin37 Mar 24, 2025
37ea46d
update the tests
MSCetin37 Mar 24, 2025
0106c90
code update & add ingest_with_index test
MSCetin37 Mar 25, 2025
39b023e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 25, 2025
2730c02
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 25, 2025
6784cdf
resolve merge conflicts
MSCetin37 Mar 26, 2025
9141a6e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 26, 2025
9b0ffa0
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 26, 2025
52178e5
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 27, 2025
c7207aa
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 27, 2025
7640f33
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 28, 2025
54069a3
Merge branch 'main' into codegen_rag_agent_v2
ashahba Mar 30, 2025
1e21883
Merge branch 'main' into codegen_rag_agent_v2
ashahba Mar 31, 2025
3051502
Merge branch 'main' into codegen_rag_agent_v2
ashahba Mar 31, 2025
badba01
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 1, 2025
685628b
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 1, 2025
40ec53a
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Apr 2, 2025
aa2acd7
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 2, 2025
142fb76
update redis test
MSCetin37 Apr 2, 2025
f3eadce
update retrievers
MSCetin37 Apr 3, 2025
1a78b96
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Apr 3, 2025
3fb6af9
update redis-delete_files
MSCetin37 Apr 3, 2025
84aa8db
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 3, 2025
bc151fe
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions comps/cores/proto/api_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class ChatCompletionRequest(BaseModel):
lambda_mult: float = 0.5
score_threshold: float = 0.2
retrieved_docs: Union[List[RetrievalResponseData], List[Dict[str, Any]]] = Field(default_factory=list)
index_name: Optional[str] = None

# reranking
top_n: int = 1
Expand Down
2 changes: 2 additions & 0 deletions comps/cores/proto/docarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class EmbedDoc(BaseDoc):
lambda_mult: float = 0.5
score_threshold: float = 0.2
constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None
index_name: Optional[str] = None


class EmbedMultimodalDoc(EmbedDoc):
Expand Down Expand Up @@ -225,6 +226,7 @@ class LLMParams(BaseDoc):
repetition_penalty: float = 1.03
stream: bool = True
language: str = "auto" # can be "en", "zh"
index_name: Optional[str] = None

chat_template: Optional[str] = Field(
default=None,
Expand Down
76 changes: 52 additions & 24 deletions comps/dataprep/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,17 @@ async def delete_by_id(client, id):
return True


async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder, index_name: str):
if logflag:
logger.info(f"[ redis ingest chunks ] file name: {file_name}")

# Batch size
batch_size = 32
num_chunks = len(chunks)

# if data will be saved to a different index name than the default one
ingest_index_name = index_name if index_name else INDEX_NAME

file_ids = []
for i in range(0, num_chunks, batch_size):
if logflag:
Expand All @@ -206,7 +209,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
Redis.from_texts_return_keys,
texts=batch_texts,
embedding=embedder,
index_name=INDEX_NAME,
index_name=ingest_index_name,
redis_url=REDIS_URL,
)
if logflag:
Expand All @@ -222,15 +225,15 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
await create_index(client)

try:
await store_by_id(client, key=file_name, value="#".join(file_ids))
await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids))
except Exception as e:
if logflag:
logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.")
raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.")
return True


async def ingest_data_to_redis(doc_path: DocPath, embedder):
async def ingest_data_to_redis(doc_path: DocPath, embedder, index_name):
"""Ingest document to Redis."""
path = doc_path.path
if logflag:
Expand Down Expand Up @@ -271,7 +274,7 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder):
logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.")

file_name = doc_path.path.split("/")[-1]
return await ingest_chunks_to_redis(file_name, chunks, embedder)
return await ingest_chunks_to_redis(file_name, chunks, embedder, index_name)


@OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS")
Expand Down Expand Up @@ -360,6 +363,7 @@ async def ingest_files(
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
index_name: str = Form(None),
):
"""Ingest files/links content into redis database.

Expand All @@ -372,6 +376,7 @@ async def ingest_files(
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
index_name (str, optional): The name of the index where data will be ingested.
"""
if logflag:
logger.info(f"[ redis ingest ] files:{files}")
Expand All @@ -384,7 +389,9 @@ async def ingest_files(

for file in files:
encode_file = encode_filename(file.filename)
doc_id = "file:" + encode_file
index_name_id = encode_filename(INDEX_NAME if index_name is None else index_name)
doc_id = "file:" + index_name_id + "_" + encode_file

if logflag:
logger.info(f"[ redis ingest ] processing file {doc_id}")

Expand All @@ -400,7 +407,7 @@ async def ingest_files(
if key_ids:
raise HTTPException(
status_code=400,
detail=f"Uploaded file {file.filename} already exists. Please change file name.",
detail=f"Uploaded file {file.filename} already exists. Please change file name or index name.",
)

save_path = upload_folder + encode_file
Expand All @@ -414,6 +421,7 @@ async def ingest_files(
table_strategy=table_strategy,
),
self.embedder,
index_name,
)
uploaded_files.append(save_path)
if logflag:
Expand All @@ -430,7 +438,8 @@ async def ingest_files(
raise HTTPException(status_code=400, detail=f"Link_list {link_list} should be a list.")
for link in link_list:
encoded_link = encode_filename(link)
doc_id = "file:" + encoded_link + ".txt"
index_name_id = encode_filename(INDEX_NAME if index_name is None else index_name)
doc_id = "file:" + index_name_id + "_" + encoded_link + ".txt"
if logflag:
logger.info(f"[ redis ingest] processing link {doc_id}")

Expand All @@ -445,7 +454,8 @@ async def ingest_files(
logger.info(f"[ redis ingest] Link {link} does not exist. Keep storing.")
if key_ids:
raise HTTPException(
status_code=400, detail=f"Uploaded link {link} already exists. Please change another link."
status_code=400,
detail=f"Uploaded link {link} already exists. Please change another link or index_name.",
)

save_path = upload_folder + encoded_link + ".txt"
Expand All @@ -460,6 +470,7 @@ async def ingest_files(
table_strategy=table_strategy,
),
self.embedder,
index_name,
)
if logflag:
logger.info(f"[ redis ingest] Successfully saved link list {link_list}")
Expand Down Expand Up @@ -505,7 +516,7 @@ async def get_files(self):
logger.info(f"[get] final file_list: {file_list}")
return file_list

async def delete_files(self, file_path: str = Body(..., embed=True)):
async def delete_files(self, file_path: str = Body(..., embed=True), index_name: str = Body(None, embed=True)):
"""Delete file according to `file_path`.

`file_path`:
Expand All @@ -531,17 +542,19 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
else:
logger.info(f"[ redis delete ] Index {KEY_INDEX_NAME} does not exits.")

# drop index INDEX_NAME
if await check_index_existance(self.data_index_client):
try:
assert drop_index(index_name=INDEX_NAME)
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. Fail to drop index {INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.")
if len(self.get_list_of_indices()) > 0:
for i in self.get_list_of_indices():
try:
# drop index INDEX_NAME
assert drop_index(index_name=i)
logger.info(f"[ redis delete ] Index_name: {i} is deleted.")
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. Fail to drop index {i}.")
raise HTTPException(status_code=500, detail=f"Fail to drop index {i}.")
else:
if logflag:
logger.info(f"[ redis delete ] Index {INDEX_NAME} does not exits.")
logger.info("[ redis delete ] There is no index_name registered to redis db.")

# delete files on local disk
try:
Expand All @@ -563,7 +576,10 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
logger.info(f"[ redis delete ] delete_path: {delete_path}")

# partially delete files
doc_id = "file:" + encode_filename(file_path)
encode_file = encode_filename(file_path)
index_name = INDEX_NAME if index_name is None else index_name
index_name_id = encode_filename(index_name)
doc_id = "file:" + index_name_id + "_" + encode_file
logger.info(f"[ redis delete ] doc id: {doc_id}")

# determine whether this file exists in db KEY_INDEX_NAME
Expand All @@ -587,16 +603,16 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.")

# delete file content in db INDEX_NAME
# delete file content in db index_name
for file_id in file_ids:
# determine whether this file exists in db INDEX_NAME
# determine whether this file exists in db index_name
try:
await search_by_id(self.data_index_client, file_id)
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path."
status_code=404, detail=f"File not found in db {index_name}. Please check file_path."
)

# delete file content
Expand All @@ -605,7 +621,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
assert res
except Exception as e:
if logflag:
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {INDEX_NAME}")
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.")

# local file does not exist (restarted docker container)
Expand All @@ -627,3 +643,15 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
if logflag:
logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")

def get_list_of_indices(self):
"""Retrieves a list of all indices from the Redis client.

Returns:
A list of index names as strings.
"""
# Execute the command to list all indices
indices = self.client.execute_command("FT._LIST")
# Decode each index name from bytes to string
indices_list = [item.decode("utf-8") for item in indices]
return indices_list
5 changes: 5 additions & 0 deletions comps/dataprep/src/opea_dataprep_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ async def delete_files(self, *args, **kwargs):
logger.info("[ dataprep loader ] delete files")
return await self.component.delete_files(*args, **kwargs)

async def get_list_of_indices(self, *args, **kwargs):
if logflag:
logger.info("[ dataprep loader ] get indices")
return self.component.get_list_of_indices(*args, **kwargs)


class OpeaDataprepMultiModalLoader(OpeaComponentLoader):
def __init__(self, component_name, **kwargs):
Expand Down
76 changes: 71 additions & 5 deletions comps/dataprep/src/opea_dataprep_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async def ingest_files(
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
index_name: Optional[str] = Form(None),
):
start = time.time()

Expand All @@ -66,9 +67,28 @@ async def ingest_files(

try:
# Use the loader to invoke the component
response = await loader.ingest_files(
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB
)
if dataprep_component_name == "OPEA_DATAPREP_REDIS":
response = await loader.ingest_files(
files,
link_list,
chunk_size,
chunk_overlap,
process_table,
table_strategy,
ingest_from_graphDB,
index_name,
)
else:
if index_name:
logger.error(
'Error during dataprep ingest invocation: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise

response = await loader.ingest_files(
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB
)

# Log the result if logging is enabled
if logflag:
logger.info(f"[ ingest ] Output generated: {response}")
Expand Down Expand Up @@ -116,15 +136,25 @@ async def get_files():
port=5000,
)
@register_statistics(names=["opea_service@dataprep"])
async def delete_files(file_path: str = Body(..., embed=True)):
async def delete_files(file_path: str = Body(..., embed=True), index_name: str = Body(None, embed=True)):
start = time.time()

if logflag:
logger.info("[ delete ] start to delete ingested files")

try:
# Use the loader to invoke the component
response = await loader.delete_files(file_path)
if dataprep_component_name == "OPEA_DATAPREP_REDIS":
response = await loader.delete_files(file_path, index_name)
else:
if index_name:
logger.error(
'Error during dataprep delete files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise
# Use the loader to invoke the component
response = await loader.delete_files(file_path)

# Log the result if logging is enabled
if logflag:
logger.info(f"[ delete ] deleted result: {response}")
Expand All @@ -136,6 +166,42 @@ async def delete_files(file_path: str = Body(..., embed=True)):
raise


@register_microservice(
name="opea_service@dataprep",
service_type=ServiceType.DATAPREP,
endpoint="/v1/dataprep/indices",
host="0.0.0.0",
port=5000,
)
@register_statistics(names=["opea_service@dataprep"])
async def get_list_of_indices():
start = time.time()
if logflag:
logger.info("[ get ] start to get list of indices.")

if dataprep_component_name != "OPEA_DATAPREP_REDIS":
logger.error(
'Error during dataprep - get list of indices: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise

try:
# Use the loader to invoke the component
response = await loader.get_list_of_indices()

# Log the result if logging is enabled
if logflag:
logger.info(f"[ get ] list of indices: {response}")

# Record statistics
statistics_dict["opea_service@dataprep"].append_latency(time.time() - start, None)

return response
except Exception as e:
logger.error(f"Error during dataprep get list of indices: {e}")
raise


if __name__ == "__main__":
logger.info("OPEA Dataprep Microservice is starting...")
create_upload_folder(upload_folder)
Expand Down
Loading
Loading