diff --git a/uniflow/flow/config.py b/uniflow/flow/config.py index 603d1877..cb9870c2 100644 --- a/uniflow/flow/config.py +++ b/uniflow/flow/config.py @@ -50,6 +50,15 @@ class ExtractPDFConfig(ExtractConfig): splitter: str = PARAGRAPH_SPLITTER +@dataclass +class ExtractS3PDFConfig(ExtractConfig): + """Extract S3 PDF Config Class.""" + + flow_name: str = "ExtractS3PDFFlow" + model_config: ModelConfig = field(default_factory=NougatModelConfig) + splitter: str = PARAGRAPH_SPLITTER + + @dataclass class ExtractImageConfig(ExtractConfig): """Extract Image Config Class""" diff --git a/uniflow/flow/extract/extract_pdf_flow.py b/uniflow/flow/extract/extract_pdf_flow.py index 4d213111..0ae2209d 100644 --- a/uniflow/flow/extract/extract_pdf_flow.py +++ b/uniflow/flow/extract/extract_pdf_flow.py @@ -5,6 +5,7 @@ from uniflow.constants import EXTRACT from uniflow.flow.flow import Flow from uniflow.node import Node +from uniflow.op.extract.load.aws.s3_op import ExtractS3PDFOp from uniflow.op.extract.load.pdf_op import ExtractPDFOp, ProcessPDFOp from uniflow.op.extract.split.constants import PARAGRAPH_SPLITTER from uniflow.op.extract.split.splitter_factory import SplitterOpsFactory @@ -50,3 +51,28 @@ def run(self, nodes: Sequence[Node]) -> Sequence[Node]: nodes = self._process_pdf_op(nodes) nodes = self._split_op(nodes) return nodes + + +class ExtractS3PDFFlow(ExtractPDFFlow): + """Extract S3 PDF Flow Class.""" + + def __init__( + self, + model_config: Dict[str, Any], + splitter: str = PARAGRAPH_SPLITTER, + ) -> None: + """Extract S3 PDF Flow Constructor. + + Args: + model_config (Dict[str, Any]): Model config. + splitter (str): Splitter to use. Defaults to "". + """ + super().__init__(model_config=model_config, splitter=splitter) + self._extract_pdf_op = ExtractS3PDFOp( + name="extract_s3_pdf_op", + model=LLMDataPreprocessor( + model_config=model_config, + ), + ) + self._process_pdf_op = ProcessPDFOp(name="process_pdf_op") + self._split_op = SplitterOpsFactory.get(splitter) diff --git a/uniflow/op/extract/load/aws/s3_op.py b/uniflow/op/extract/load/aws/s3_op.py index 06c25ffd..256f04b1 100644 --- a/uniflow/op/extract/load/aws/s3_op.py +++ b/uniflow/op/extract/load/aws/s3_op.py @@ -6,6 +6,7 @@ from typing import Sequence from uniflow.node import Node +from uniflow.op.model.abs_llm_processor import AbsLLMProcessor from uniflow.op.op import Op logger = logging.getLogger(__name__) @@ -66,3 +67,49 @@ def __call__(self, nodes: Sequence[Node]) -> Sequence[Node]: ) ) return output_nodes + + +class ExtractS3PDFOp(ExtractS3Op): + """Op to download and process PDF from s3.""" + + def __init__(self, model: AbsLLMProcessor, name: str = "extract_s3_pdf_op") -> None: + super().__init__(name=name) + self._model = model + + def __call__(self, nodes: Sequence[Node]) -> Sequence[Node]: + """Run Model Op. + + Args: + nodes (Sequence[Node]): Nodes to run. + + Returns: + Sequence[Node]: Nodes after running. + """ + output_nodes = [] + + for node in nodes: + value_dict = copy.deepcopy(node.value_dict) + # create local file path if not exists + if os.path.exists(self.LOCAL_FILE_PATH) is False: + os.makedirs(self.LOCAL_FILE_PATH) + filename = os.path.join(self.LOCAL_FILE_PATH, value_dict["key"]) + + logger.info("Downloading %s to %s", value_dict["key"], filename) + self._s3_client.download_file( + Bucket=value_dict["bucket"], + Key=value_dict["key"], + Filename=filename, + ) + + filename = {"pdf": filename} + value_dict = self._model.run(filename) + text = value_dict["response"][0] + + output_nodes.append( + Node( + name=self.unique_name(), + value_dict={"text": text}, + prev_nodes=[node], + ) + ) + return output_nodes diff --git a/uniflow/op/extract/load/pdf_op.py b/uniflow/op/extract/load/pdf_op.py index 1ba85a3a..b6cff430 100644 --- a/uniflow/op/extract/load/pdf_op.py +++ b/uniflow/op/extract/load/pdf_op.py @@ -5,6 +5,7 @@ from typing import Sequence from uniflow.node import Node +from uniflow.op.extract.load.utils import read_file from uniflow.op.model.abs_llm_processor import AbsLLMProcessor from uniflow.op.op import Op @@ -34,6 +35,7 @@ def __call__(self, nodes: Sequence[Node]) -> Sequence[Node]: output_nodes = [] for node in nodes: value_dict = copy.deepcopy(node.value_dict) + value_dict = read_file(value_dict) value_dict = self._model.run(value_dict) text = value_dict["response"][0] output_nodes.append(