Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7aa97a1
new: `DataprepRequest`
aMahanna Apr 4, 2025
75f4ecd
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 4, 2025
c10a013
fix: docstrings
aMahanna Apr 4, 2025
6212d84
Merge branch 'dataprep-model' of https://github.com/aMahanna/GenAICom…
aMahanna Apr 4, 2025
f08b6c0
rem: `ingest_from_graphDB`
aMahanna Apr 4, 2025
7eabcfa
new: dep injection
aMahanna Apr 7, 2025
eb44dfd
Merge branch 'main' into dataprep-model
aMahanna Apr 7, 2025
0c994dc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
31cd463
fix: verbose `input` processing
aMahanna Apr 7, 2025
c9eb793
Merge branch 'dataprep-model' of https://github.com/aMahanna/GenAICom…
aMahanna Apr 7, 2025
b22573e
attempt: replace `kwargs` with params
aMahanna Apr 8, 2025
fedb099
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 8, 2025
e86a8b1
Merge branch 'main' into dataprep-model
lvliang-intel Apr 8, 2025
1b74b90
rem: `db_type`
aMahanna Apr 8, 2025
3af7bcd
Merge branch 'main' into dataprep-model
aMahanna Apr 8, 2025
620ca6b
attempt: require `base`
aMahanna Apr 8, 2025
97842bc
Revert "attempt: require `base`"
aMahanna Apr 8, 2025
40d90f3
new: `DataprepRequest`
aMahanna Apr 4, 2025
fee0afc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 4, 2025
f6978de
fix: docstrings
aMahanna Apr 4, 2025
9e90414
rem: `ingest_from_graphDB`
aMahanna Apr 4, 2025
c43ffb2
new: dep injection
aMahanna Apr 7, 2025
367fed0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2025
8b089f6
fix: verbose `input` processing
aMahanna Apr 7, 2025
9998f21
attempt: replace `kwargs` with params
aMahanna Apr 8, 2025
c2c4cb0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 8, 2025
02cd3e9
rem: `db_type`
aMahanna Apr 8, 2025
1192ae7
attempt: require `base`
aMahanna Apr 8, 2025
49090e3
Revert "attempt: require `base`"
aMahanna Apr 8, 2025
390b316
Fix dataprep request class issue of Redis (#1)
letonghan Apr 11, 2025
34e0edd
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 11, 2025
9059e6b
Merge branch 'dataprep-model' of https://github.com/aMahanna/GenAICom…
aMahanna Apr 11, 2025
ae842f1
revert: `DataprepRequest` for multimodal
aMahanna Apr 11, 2025
e473dfd
Merge branch 'dataprep-model' of https://github.com/aMahanna/GenAICom…
aMahanna Apr 11, 2025
42a4a3d
Merge branch 'main' into dataprep-model
aMahanna Apr 11, 2025
a0e8f16
revert: `DataprepRequest` for multimodal (PT2)
aMahanna Apr 11, 2025
2092513
Merge branch 'dataprep-model' of https://github.com/aMahanna/GenAICom…
aMahanna Apr 11, 2025
4a912d8
fix: conditionally fetch unique `DataprepRequest` attributes
aMahanna Apr 11, 2025
06bbc57
Merge branch 'main' into dataprep-model
aMahanna Apr 14, 2025
7fe5c01
Merge branch 'main' into dataprep-model
letonghan Apr 17, 2025
a9028c3
fix bugs in dataprep util script
letonghan Apr 17, 2025
9f7c293
Merge branch 'main' into dataprep-model
letonghan Apr 17, 2025
e87c14e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2025
4961a09
revert change of pgvector
letonghan Apr 17, 2025
0dbd8c0
fix indices bug for redis
letonghan Apr 17, 2025
d364988
minor fix for redis
letonghan Apr 17, 2025
7104c1b
ingest file into rag_redis_test
letonghan Apr 17, 2025
9122937
update indice name
letonghan Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion comps/cores/proto/api_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Dict, List, Literal, Optional, Union

import shortuuid
from fastapi import File, UploadFile
from fastapi import File, Form, UploadFile
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

Expand Down Expand Up @@ -80,6 +80,70 @@
prompts: List[TokenCheckResponseItem]


class DataprepRequest:
def __init__(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: Optional[int] = Form(1500),
chunk_overlap: Optional[int] = Form(100),
process_table: Optional[bool] = Form(False),
table_strategy: Optional[str] = Form("fast"),
):
self.files = files
self.link_list = link_list
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.process_table = process_table
self.table_strategy = table_strategy

Check warning on line 98 in comps/cores/proto/api_protocol.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/proto/api_protocol.py#L93-L98

Added lines #L93 - L98 were not covered by tests


class Neo4jDataprepRequest(DataprepRequest):
def __init__(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: Optional[int] = Form(1500),
chunk_overlap: Optional[int] = Form(100),
process_table: Optional[bool] = Form(False),
table_strategy: Optional[str] = Form("fast"),
ingest_from_graphDB: bool = Form(False),
):
super().__init__(

Check warning on line 112 in comps/cores/proto/api_protocol.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/proto/api_protocol.py#L112

Added line #L112 was not covered by tests
files=files,
link_list=link_list,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)

self.ingest_from_graphDB = ingest_from_graphDB

Check warning on line 121 in comps/cores/proto/api_protocol.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/proto/api_protocol.py#L121

Added line #L121 was not covered by tests


class RedisDataprepRequest(DataprepRequest):
def __init__(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: Optional[int] = Form(1500),
chunk_overlap: Optional[int] = Form(100),
process_table: Optional[bool] = Form(False),
table_strategy: Optional[str] = Form("fast"),
index_name: str = Form(None),
):
super().__init__(

Check warning on line 135 in comps/cores/proto/api_protocol.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/proto/api_protocol.py#L135

Added line #L135 was not covered by tests
files=files,
link_list=link_list,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)

self.index_name = index_name

Check warning on line 144 in comps/cores/proto/api_protocol.py

View check run for this annotation

Codecov / codecov/patch

comps/cores/proto/api_protocol.py#L144

Added line #L144 was not covered by tests


class EmbeddingRequest(BaseModel):
# Ordered by official OpenAI API documentation
# https://platform.openai.com/docs/api-reference/embeddings
Expand Down
32 changes: 16 additions & 16 deletions comps/dataprep/src/integrations/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from langchain_huggingface import HuggingFaceEmbeddings

from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import DataprepRequest
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -237,28 +238,27 @@ async def ingest_link_to_elastic(self, link_list: List[str]) -> None:
if logflag:
logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}")

async def ingest_files(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
):
async def ingest_files(self, input: DataprepRequest):
"""Ingest files/links content into ElasticSearch database.

Save in the format of vector[768].
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
input (DataprepRequest): Model containing the following parameters:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
"""
files = input.files
link_list = input.link_list
chunk_size = input.chunk_size
chunk_overlap = input.chunk_overlap
process_table = input.process_table
table_strategy = input.table_strategy

if logflag:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
Expand Down
32 changes: 16 additions & 16 deletions comps/dataprep/src/integrations/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from langchain_text_splitters import HTMLHeaderTextSplitter

from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import DataprepRequest
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -244,28 +245,27 @@ def check_health(self) -> bool:
def invoke(self, *args, **kwargs):
pass

async def ingest_files(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
):
async def ingest_files(self, input: DataprepRequest):
"""Ingest files/links content into milvus database.

Save in the format of vector[], the vector length depends on the emedding model type.
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
input (DataprepRequest): Model containing the following parameters:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
"""
files = input.files
link_list = input.link_list
chunk_size = input.chunk_size
chunk_overlap = input.chunk_overlap
process_table = input.process_table
table_strategy = input.table_strategy

if logflag:
logger.info(f"[ milvus ingest ] files:{files}")
logger.info(f"[ milvus ingest ] link_list:{link_list}")
Expand Down
32 changes: 16 additions & 16 deletions comps/dataprep/src/integrations/neo4j_langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from langchain_text_splitters import HTMLHeaderTextSplitter

from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import DataprepRequest
from comps.dataprep.src.utils import (
document_loader,
encode_filename,
Expand Down Expand Up @@ -137,28 +138,27 @@ async def ingest_data_to_neo4j(self, doc_path: DocPath):

return True

async def ingest_files(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
):
async def ingest_files(self, input: DataprepRequest):
"""Ingest files/links content into Neo4j database.

Save in the format of vector[768].
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
input (DataprepRequest): Model containing the following parameters:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
"""
files = input.files
link_list = input.link_list
chunk_size = input.chunk_size
chunk_overlap = input.chunk_overlap
process_table = input.process_table
table_strategy = input.table_strategy

if logflag:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
Expand Down
38 changes: 21 additions & 17 deletions comps/dataprep/src/integrations/neo4j_llamaindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from transformers import AutoTokenizer

from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import DataprepRequest, Neo4jDataprepRequest
from comps.dataprep.src.utils import (
document_loader,
encode_filename,
Expand Down Expand Up @@ -664,29 +665,32 @@ async def build_communities(self, index: PropertyGraphIndex):
logger.error(f"Error building communities: {e}\n{error_trace}")
return False

async def ingest_files(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
):
async def ingest_files(self, input: Union[DataprepRequest, Neo4jDataprepRequest]):
"""Ingest files/links content into Neo4j database.

Save in the format of vector[768].
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
ingest_from_graphDB (bool, optional): Whether to skip generating graph from files and instead loading index from existing graph store.
input (Neo4jDataprepRequest): Model containing the following parameters:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
ingest_from_graphDB (bool, optional): Whether to skip generating graph from files and instead loading index from existing graph store.
"""
files = input.files
link_list = input.link_list
chunk_size = input.chunk_size
chunk_overlap = input.chunk_overlap
process_table = input.process_table
table_strategy = input.table_strategy

ingest_from_graphDB = False
if isinstance(input, Neo4jDataprepRequest):
ingest_from_graphDB = input.ingest_from_graphDB

if logflag:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
Expand Down
29 changes: 16 additions & 13 deletions comps/dataprep/src/integrations/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from opensearchpy import OpenSearch

from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import DataprepRequest
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -322,26 +323,28 @@ def search_all_documents(self, index_name, offset, search_batch_size):

async def ingest_files(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
input: DataprepRequest,
):
"""Ingest files/links content into opensearch database.

Save in the format of vector[768].
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
input (DataprepRequest): Model containing the following parameters:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
"""
files = input.files
link_list = input.link_list
chunk_size = input.chunk_size
chunk_overlap = input.chunk_overlap
process_table = input.process_table
table_strategy = input.table_strategy

if logflag:
logger.info(f"[ upload ] files:{files}")
logger.info(f"[ upload ] link_list:{link_list}")
Expand Down
25 changes: 12 additions & 13 deletions comps/dataprep/src/integrations/pgvect.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from langchain_huggingface import HuggingFaceEmbeddings

from comps import CustomLogger, DocPath, OpeaComponent, OpeaComponentRegistry, ServiceType
from comps.cores.proto.api_protocol import DataprepRequest
from comps.dataprep.src.utils import (
create_upload_folder,
document_loader,
Expand Down Expand Up @@ -245,26 +246,24 @@ async def ingest_link_to_pgvector(self, link_list: List[str]):

async def ingest_files(
self,
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
ingest_from_graphDB: bool = Form(False),
input: DataprepRequest,
):
"""Ingest files/links content into pgvector database.

Save in the format of vector[768].
Returns '{"status": 200, "message": "Data preparation succeeded"}' if successful.
Args:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
input (DataprepRequest): Model containing the following parameters:
files (Union[UploadFile, List[UploadFile]], optional): A file or a list of files to be ingested. Defaults to File(None).
link_list (str, optional): A list of links to be ingested. Defaults to Form(None).
chunk_size (int, optional): The size of the chunks to be split. Defaults to Form(1500).
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
"""
files = input.files
link_list = input.link_list

if logflag:
logger.info(f"files:{files}")
logger.info(f"link_list:{link_list}")
Expand Down
Loading
Loading