Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3424317
CodeGen initial
MSCetin37 Mar 20, 2025
682a90f
code update
MSCetin37 Mar 20, 2025
b4f64c4
code update
MSCetin37 Mar 22, 2025
d3a7d4a
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 22, 2025
e4feb18
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 22, 2025
82542fc
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 22, 2025
6eaeb32
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 24, 2025
07b3b83
adding index_name variable
MSCetin37 Mar 24, 2025
925f4a2
update unit tests
MSCetin37 Mar 24, 2025
37ea46d
update the tests
MSCetin37 Mar 24, 2025
0106c90
code update & add ingest_with_index test
MSCetin37 Mar 25, 2025
39b023e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 25, 2025
2730c02
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 25, 2025
6784cdf
resolve merge conflicts
MSCetin37 Mar 26, 2025
9141a6e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 26, 2025
9b0ffa0
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 26, 2025
52178e5
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 27, 2025
c7207aa
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 27, 2025
7640f33
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Mar 28, 2025
54069a3
Merge branch 'main' into codegen_rag_agent_v2
ashahba Mar 30, 2025
1e21883
Merge branch 'main' into codegen_rag_agent_v2
ashahba Mar 31, 2025
3051502
Merge branch 'main' into codegen_rag_agent_v2
ashahba Mar 31, 2025
badba01
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 1, 2025
685628b
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 1, 2025
40ec53a
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Apr 2, 2025
aa2acd7
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 2, 2025
142fb76
update redis test
MSCetin37 Apr 2, 2025
f3eadce
update retrievers
MSCetin37 Apr 3, 2025
1a78b96
Merge branch 'main' into codegen_rag_agent_v2
MSCetin37 Apr 3, 2025
3fb6af9
update redis-delete_files
MSCetin37 Apr 3, 2025
84aa8db
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 3, 2025
bc151fe
Merge branch 'main' into codegen_rag_agent_v2
ashahba Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions comps/cores/proto/api_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class ChatCompletionRequest(BaseModel):
lambda_mult: float = 0.5
score_threshold: float = 0.2
retrieved_docs: Union[List[RetrievalResponseData], List[Dict[str, Any]]] = Field(default_factory=list)
index_name: Optional[str] = None

# reranking
top_n: int = 1
Expand Down
2 changes: 2 additions & 0 deletions comps/cores/proto/docarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class EmbedDoc(BaseDoc):
lambda_mult: float = 0.5
score_threshold: float = 0.2
constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None
index_name: Optional[str] = None


class EmbedMultimodalDoc(EmbedDoc):
Expand Down Expand Up @@ -225,6 +226,7 @@ class LLMParams(BaseDoc):
repetition_penalty: float = 1.03
stream: bool = True
language: str = "auto" # can be "en", "zh"
index_name: Optional[str] = None

chat_template: Optional[str] = Field(
default=None,
Expand Down
27 changes: 23 additions & 4 deletions comps/dataprep/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,17 @@ async def delete_by_id(client, id):
return True


async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder, index_name: str):
if logflag:
logger.info(f"[ redis ingest chunks ] file name: {file_name}")

# Batch size
batch_size = 32
num_chunks = len(chunks)

# if data will be saved to a different index name than the default one
ingest_index_name = index_name if index_name else INDEX_NAME

file_ids = []
for i in range(0, num_chunks, batch_size):
if logflag:
Expand All @@ -206,7 +209,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
Redis.from_texts_return_keys,
texts=batch_texts,
embedding=embedder,
index_name=INDEX_NAME,
index_name=ingest_index_name,
redis_url=REDIS_URL,
)
if logflag:
Expand All @@ -230,7 +233,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
return True


async def ingest_data_to_redis(doc_path: DocPath, embedder):
async def ingest_data_to_redis(doc_path: DocPath, embedder, index_name):
"""Ingest document to Redis."""
path = doc_path.path
if logflag:
Expand Down Expand Up @@ -271,7 +274,7 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder):
logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.")

file_name = doc_path.path.split("/")[-1]
return await ingest_chunks_to_redis(file_name, chunks, embedder)
return await ingest_chunks_to_redis(file_name, chunks, embedder, index_name)


@OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS")
Expand Down Expand Up @@ -360,6 +363,7 @@ async def ingest_files(
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
index_name: str = Form(None),
):
"""Ingest files/links content into redis database.

Expand All @@ -372,6 +376,7 @@ async def ingest_files(
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
index_name (str, optional): The name of the index where data will be ingested.
"""
if logflag:
logger.info(f"[ redis ingest ] files:{files}")
Expand Down Expand Up @@ -414,6 +419,7 @@ async def ingest_files(
table_strategy=table_strategy,
),
self.embedder,
index_name,
)
uploaded_files.append(save_path)
if logflag:
Expand Down Expand Up @@ -460,6 +466,7 @@ async def ingest_files(
table_strategy=table_strategy,
),
self.embedder,
index_name,
)
if logflag:
logger.info(f"[ redis ingest] Successfully saved link list {link_list}")
Expand Down Expand Up @@ -627,3 +634,15 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
if logflag:
Comment thread
MSCetin37 marked this conversation as resolved.
logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")

def get_list_of_indices(self):
"""Retrieves a list of all indices from the Redis client.

Returns:
A list of index names as strings.
"""
# Execute the command to list all indices
indices = self.client.execute_command("FT._LIST")
# Decode each index name from bytes to string
indices_list = [item.decode("utf-8") for item in indices]
return indices_list
5 changes: 5 additions & 0 deletions comps/dataprep/src/opea_dataprep_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ async def delete_files(self, *args, **kwargs):
logger.info("[ dataprep loader ] delete files")
return await self.component.delete_files(*args, **kwargs)

async def get_list_of_indices(self, *args, **kwargs):
if logflag:
logger.info("[ dataprep loader ] get indices")
return self.component.get_list_of_indices(*args, **kwargs)


class OpeaDataprepMultiModalLoader(OpeaComponentLoader):
def __init__(self, component_name, **kwargs):
Expand Down
62 changes: 59 additions & 3 deletions comps/dataprep/src/opea_dataprep_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async def ingest_files(
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
index_name: Optional[str] = Form(None),
):
start = time.time()

Expand All @@ -66,9 +67,28 @@ async def ingest_files(

try:
# Use the loader to invoke the component
response = await loader.ingest_files(
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB
)
if dataprep_component_name == "OPEA_DATAPREP_REDIS":
response = await loader.ingest_files(
files,
link_list,
chunk_size,
chunk_overlap,
process_table,
table_strategy,
ingest_from_graphDB,
index_name,
)
else:
if index_name:
logger.error(
Comment thread
letonghan marked this conversation as resolved.
'Error during dataprep ingest invocation: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise

response = await loader.ingest_files(
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB
)

# Log the result if logging is enabled
if logflag:
logger.info(f"[ ingest ] Output generated: {response}")
Expand Down Expand Up @@ -136,6 +156,42 @@ async def delete_files(file_path: str = Body(..., embed=True)):
raise


@register_microservice(
name="opea_service@dataprep",
service_type=ServiceType.DATAPREP,
endpoint="/v1/dataprep/indices",
host="0.0.0.0",
port=5000,
)
@register_statistics(names=["opea_service@dataprep"])
async def get_list_of_indices():
start = time.time()
if logflag:
logger.info("[ get ] start to get list of indices.")

if dataprep_component_name != "OPEA_DATAPREP_REDIS":
logger.error(
'Error during dataprep - get list of indices: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
)
raise

try:
# Use the loader to invoke the component
response = await loader.get_list_of_indices()
Comment thread
MSCetin37 marked this conversation as resolved.

# Log the result if logging is enabled
if logflag:
logger.info(f"[ get ] list of indices: {response}")

# Record statistics
statistics_dict["opea_service@dataprep"].append_latency(time.time() - start, None)

return response
except Exception as e:
logger.error(f"Error during dataprep get list of indices: {e}")
raise


if __name__ == "__main__":
logger.info("OPEA Dataprep Microservice is starting...")
create_upload_folder(upload_folder)
Expand Down
21 changes: 13 additions & 8 deletions comps/retrievers/src/integrations/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ async def _initialize_embedder(self):
embedder = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
return embedder

async def _initialize_client(self) -> Redis:
async def _initialize_client(self, index_name=INDEX_NAME) -> Redis:
"""Initializes the redis client."""
try:
if BRIDGE_TOWER_EMBEDDING:
logger.info(f"generate multimodal redis instance with {BRIDGE_TOWER_EMBEDDING}")
client = Redis(
embedding=self.embeddings, index_name=INDEX_NAME, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL
embedding=self.embeddings, index_name=index_name, index_schema=INDEX_SCHEMA, redis_url=REDIS_URL
)
else:
client = Redis(embedding=self.embeddings, index_name=INDEX_NAME, redis_url=REDIS_URL)
client = Redis(embedding=self.embeddings, index_name=index_name, redis_url=REDIS_URL)
return client
except Exception as e:
logger.error(f"fail to initialize redis client: {e}")
Expand Down Expand Up @@ -138,9 +138,14 @@ async def invoke(
if logflag:
logger.info(input)

client = self.client
if isinstance(input, EmbedDoc) and input.index_name and input.index_name != INDEX_NAME:
client = asyncio.run(self._initialize_client(index_name=input.index_name))

# check if the Redis index has data
try:
keys_exist = self.client.client.keys()
keys_exist = client.client.keys()

except Exception as e:
logger.error(f"Redis key check failed: {e}")
keys_exist = []
Expand All @@ -163,30 +168,30 @@ async def invoke(
# if the Redis index has data, perform the search
if input.search_type == "similarity":
search_res = await run_in_thread(
self.client.similarity_search_by_vector, embedding=embedding_data_input, k=input.k
client.similarity_search_by_vector, embedding=embedding_data_input, k=input.k
)
elif input.search_type == "similarity_distance_threshold":
if input.distance_threshold is None:
raise ValueError(
"distance_threshold must be provided for " + "similarity_distance_threshold retriever"
)
search_res = await run_in_thread(
self.client.similarity_search_by_vector,
client.similarity_search_by_vector,
embedding=input.embedding,
k=input.k,
distance_threshold=input.distance_threshold,
)
elif input.search_type == "similarity_score_threshold":
docs_and_similarities = await run_in_thread(
self.client.similarity_search_with_relevance_scores,
client.similarity_search_with_relevance_scores,
query=input.text,
k=input.k,
score_threshold=input.score_threshold,
)
search_res = [doc for doc, _ in docs_and_similarities]
elif input.search_type == "mmr":
search_res = await run_in_thread(
self.client.max_marginal_relevance_search,
client.max_marginal_relevance_search,
query=input.text,
k=input.k,
fetch_k=input.fetch_k,
Expand Down
17 changes: 16 additions & 1 deletion tests/dataprep/dataprep_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function _invoke_curl() {
ingest)
header='Content-Type: multipart/form-data'
;;
delete|get)
delete|get|indices)
header='Content-Type: application/json'
;;
*)
Expand Down Expand Up @@ -117,6 +117,21 @@ function get_all() {
_invoke_curl $fqdn $port get $@
}

function ingest_txt_with_index_name() {
local fqdn=$1
local port=$2
local index_name=$3
shift 3
_invoke_curl $fqdn $port ingest -F "files=@${SCRIPT_DIR}/ingest_dataprep.txt" -F "index_name=${index_name}" $@
}

function indices() {
local fqdn=$1
local port=$2
shift 2
_invoke_curl $fqdn $port indices $@
}

function check_result() {
local service_name=$1
local expected_response=$2
Expand Down
9 changes: 8 additions & 1 deletion tests/dataprep/test_dataprep_redis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,17 @@ function validate_microservice() {
ingest_xlsx ${ip_address} ${DATAPREP_PORT}
check_result "dataprep - upload - xlsx" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log

# test /v1/dataprep/ingest upload link
# test /v1/dataprep/ingest upload link
ingest_external_link ${ip_address} ${DATAPREP_PORT}
check_result "dataprep - upload - link" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log

Comment thread
MSCetin37 marked this conversation as resolved.
ingest_txt_with_index_name ${ip_address} ${DATAPREP_PORT} rag_redis_test
check_result "dataprep - upload with index - txt" "Data preparation succeeded" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log

# test /v1/dataprep/indices
indices ${ip_address} ${DATAPREP_PORT}
check_result "dataprep - indices" "['rag_redis_test']" dataprep-redis-server ${LOG_PATH}/dataprep_upload_file.log

# test /v1/dataprep/get
get_all ${ip_address} ${DATAPREP_PORT}
check_result "dataprep - get" '{"name":' dataprep-redis-server ${LOG_PATH}/dataprep_file.log
Expand Down