Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions uniflow/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ class ExtractPDFConfig(ExtractConfig):
splitter: str = PARAGRAPH_SPLITTER


@dataclass
class ExtractS3PDFConfig(ExtractConfig):
"""Extract S3 PDF Config Class."""

flow_name: str = "ExtractS3PDFFlow"
model_config: ModelConfig = field(default_factory=NougatModelConfig)
splitter: str = PARAGRAPH_SPLITTER


@dataclass
class ExtractImageConfig(ExtractConfig):
"""Extract Image Config Class"""
Expand Down
26 changes: 26 additions & 0 deletions uniflow/flow/extract/extract_pdf_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uniflow.constants import EXTRACT
from uniflow.flow.flow import Flow
from uniflow.node import Node
from uniflow.op.extract.load.aws.s3_op import ExtractS3PDFOp
from uniflow.op.extract.load.pdf_op import ExtractPDFOp, ProcessPDFOp
from uniflow.op.extract.split.constants import PARAGRAPH_SPLITTER
from uniflow.op.extract.split.splitter_factory import SplitterOpsFactory
Expand Down Expand Up @@ -50,3 +51,28 @@ def run(self, nodes: Sequence[Node]) -> Sequence[Node]:
nodes = self._process_pdf_op(nodes)
nodes = self._split_op(nodes)
return nodes


class ExtractS3PDFFlow(ExtractPDFFlow):
"""Extract S3 PDF Flow Class."""

def __init__(
self,
model_config: Dict[str, Any],
splitter: str = PARAGRAPH_SPLITTER,
) -> None:
"""Extract S3 PDF Flow Constructor.

Args:
model_config (Dict[str, Any]): Model config.
splitter (str): Splitter to use. Defaults to "".
"""
super().__init__(model_config=model_config, splitter=splitter)
self._extract_pdf_op = ExtractS3PDFOp(
name="extract_s3_pdf_op",
model=LLMDataPreprocessor(
model_config=model_config,
),
)
self._process_pdf_op = ProcessPDFOp(name="process_pdf_op")
self._split_op = SplitterOpsFactory.get(splitter)
47 changes: 47 additions & 0 deletions uniflow/op/extract/load/aws/s3_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Sequence

from uniflow.node import Node
from uniflow.op.model.abs_llm_processor import AbsLLMProcessor
from uniflow.op.op import Op

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,3 +67,49 @@ def __call__(self, nodes: Sequence[Node]) -> Sequence[Node]:
)
)
return output_nodes


class ExtractS3PDFOp(ExtractS3Op):
"""Op to download and process PDF from s3."""

def __init__(self, model: AbsLLMProcessor, name: str = "extract_s3_pdf_op") -> None:
super().__init__(name=name)
self._model = model

def __call__(self, nodes: Sequence[Node]) -> Sequence[Node]:
"""Run Model Op.

Args:
nodes (Sequence[Node]): Nodes to run.

Returns:
Sequence[Node]: Nodes after running.
"""
output_nodes = []

for node in nodes:
value_dict = copy.deepcopy(node.value_dict)
# create local file path if not exists
if os.path.exists(self.LOCAL_FILE_PATH) is False:
os.makedirs(self.LOCAL_FILE_PATH)
filename = os.path.join(self.LOCAL_FILE_PATH, value_dict["key"])

logger.info("Downloading %s to %s", value_dict["key"], filename)
self._s3_client.download_file(
Bucket=value_dict["bucket"],
Key=value_dict["key"],
Filename=filename,
)

filename = {"pdf": filename}
value_dict = self._model.run(filename)
text = value_dict["response"][0]

output_nodes.append(
Node(
name=self.unique_name(),
value_dict={"text": text},
prev_nodes=[node],
)
)
return output_nodes
2 changes: 2 additions & 0 deletions uniflow/op/extract/load/pdf_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Sequence

from uniflow.node import Node
from uniflow.op.extract.load.utils import read_file
from uniflow.op.model.abs_llm_processor import AbsLLMProcessor
from uniflow.op.op import Op

Expand Down Expand Up @@ -34,6 +35,7 @@ def __call__(self, nodes: Sequence[Node]) -> Sequence[Node]:
output_nodes = []
for node in nodes:
value_dict = copy.deepcopy(node.value_dict)
value_dict = read_file(value_dict)
value_dict = self._model.run(value_dict)
text = value_dict["response"][0]
output_nodes.append(
Expand Down