Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,395 changes: 1,395 additions & 0 deletions examples/python/data-preprocessing/SparkNLP_Reader2Doc_Demo.ipynb

Large diffs are not rendered by default.

188 changes: 188 additions & 0 deletions python/sparknlp/reader/reader2doc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright 2017-2025 John Snow Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark import keyword_only
from pyspark.ml.param import TypeConverters, Params, Param

from sparknlp.common import AnnotatorType
from sparknlp.internal import AnnotatorTransformer
from sparknlp.partition.partition_properties import *


class Reader2Doc(
AnnotatorTransformer,
HasEmailReaderProperties,
HasExcelReaderProperties,
HasHTMLReaderProperties,
HasPowerPointProperties,
HasTextReaderProperties,
):
"""
The Reader2Doc annotator allows you to use reading files more smoothly within existing
Spark NLP workflows, enabling seamless reuse of your pipelines.

Reader2Doc can be used for extracting structured content from various document types
using Spark NLP readers. It supports reading from many file types and returns parsed
output as a structured Spark DataFrame.

Supported formats include:

- Plain text
- HTML
- Word (.doc/.docx)
- Excel (.xls/.xlsx)
- PowerPoint (.ppt/.pptx)
- Email files (.eml, .msg)
- PDFs

Examples
--------
>>> from johnsnowlabs.reader import Reader2Doc
>>> from johnsnowlabs.nlp.base import DocumentAssembler
>>> from pyspark.ml import Pipeline
>>> # Initialize Reader2Doc for PDF files
>>> reader2doc = Reader2Doc() \\
... .setContentType("application/pdf") \\
... .setContentPath(f"{pdf_directory}/")
>>> # Build the pipeline with the Reader2Doc stage
>>> pipeline = Pipeline(stages=[reader2doc])
>>> # Fit the pipeline to an empty DataFrame
>>> pipeline_model = pipeline.fit(empty_data_set)
>>> result_df = pipeline_model.transform(empty_data_set)
>>> # Show the resulting DataFrame
>>> result_df.show()
+------------------------------------------------------------------------------------------------------------------------------------+
|document |
+------------------------------------------------------------------------------------------------------------------------------------+
|[{'document', 0, 14, 'This is a Title', {'pageNumber': 1, 'elementType': 'Title', 'fileName': 'pdf-title.pdf'}, []}] |
|[{'document', 15, 38, 'This is a narrative text', {'pageNumber': 1, 'elementType': 'NarrativeText', 'fileName': 'pdf-title.pdf'}, []}]|
|[{'document', 39, 68, 'This is another narrative text', {'pageNumber': 1, 'elementType': 'NarrativeText', 'fileName': 'pdf-title.pdf'}, []}]|
+------------------------------------------------------------------------------------------------------------------------------------+
"""

name = "Reader2Doc"
outputAnnotatorType = AnnotatorType.DOCUMENT

contentPath = Param(
Params._dummy(),
"contentPath",
"contentPath path to files to read",
typeConverter=TypeConverters.toString,
)

outputCol = Param(
Params._dummy(),
"outputCol",
"output column name",
typeConverter=TypeConverters.toString,
)

contentType = Param(
Params._dummy(),
"contentType",
"Set the content type to load following MIME specification",
typeConverter=TypeConverters.toString,
)

explodeDocs = Param(
Params._dummy(),
"explodeDocs",
"whether to explode the documents into separate rows",
typeConverter=TypeConverters.toBoolean,
)

flattenOutput = Param(
Params._dummy(),
"flattenOutput",
"If true, output is flattened to plain text with minimal metadata",
typeConverter=TypeConverters.toBoolean,
)

titleThreshold = Param(
Params._dummy(),
"titleThreshold",
"Minimum font size threshold for title detection in PDF docs",
typeConverter=TypeConverters.toFloat,
)

@keyword_only
def __init__(self):
super(Reader2Doc, self).__init__(classname="com.johnsnowlabs.reader.Reader2Doc")
self._setDefault(outputCol="document")

@keyword_only
def setParams(self):
kwargs = self._input_kwargs
return self._set(**kwargs)

def setContentPath(self, value):
"""Sets content path.

Parameters
----------
value : str
contentPath path to files to read
"""
return self._set(contentPath=value)

def setContentType(self, value):
"""
Set the content type to load following MIME specification

Parameters
----------
value : str
content type to load following MIME specification
"""
return self._set(contentType=value)

def setExplodeDocs(self, value):
"""Sets whether to explode the documents into separate rows.


Parameters
----------
value : boolean
Whether to explode the documents into separate rows
"""
return self._set(explodeDocs=value)

def setOutputCol(self, value):
"""Sets output column name.

Parameters
----------
value : str
Name of the Output Column
"""
return self._set(outputCol=value)

def setFlattenOutput(self, value):
"""Sets whether to flatten the output to plain text with minimal metadata.

Parameters
----------
value : bool
If true, output is flattened to plain text with minimal metadata
"""
return self._set(flattenOutput=value)

def setTitleThreshold(self, value):
"""Sets the minimum font size threshold for title detection in PDF documents.

Parameters
----------
value : float
Minimum font size threshold for title detection in PDF docs
"""
return self._set(titleThreshold=value)
45 changes: 45 additions & 0 deletions python/sparknlp/reader/sparknlp_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,4 +413,49 @@ def md(self, filePath):
if not isinstance(filePath, str):
raise TypeError("filePath must be a string")
jdf = self._java_obj.md(filePath)
return self.getDataFrame(self.spark, jdf)

def csv(self, csvPath):
"""Reads CSV files and returns a Spark DataFrame.

Parameters
----------
docPath : str
Path to an CSV file or a directory containing CSV files.

Returns
-------
pyspark.sql.DataFrame
A DataFrame containing parsed CSV content.

Examples
--------
>>> from sparknlp.reader import SparkNLPReader
>>> csv_df = SparkNLPReader(spark).csv("home/user/csv-directory")

You can use SparkNLP for one line of code

>>> import sparknlp
>>> csv_df = sparknlp.read().csv("home/user/csv-directory")
>>> csv_df.show(truncate=False)
+-----------------------------------------------------------------------------------------------------------------------------------------+
|csv |
+-----------------------------------------------------------------------------------------------------------------------------------------+
|[{NarrativeText, Alice 100 Bob 95, {}}, {Table, <table><tr><td>Alice</td><td>100</td></tr><tr><td>Bob</td><td>95</td></tr></table>, {}}] |
+-----------------------------------------------------------------------------------------------------------------------------------------+

>>> csv_df.printSchema()
root
|-- path: string (nullable = true)
|-- csv: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- elementType: string (nullable = true)
| | |-- content: string (nullable = true)
| | |-- metadata: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
"""
if not isinstance(csvPath, str):
raise TypeError("docPath must be a string")
jdf = self._java_obj.csv(csvPath)
return self.getDataFrame(self.spark, jdf)
7 changes: 3 additions & 4 deletions python/test/partition/partition_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def runTest(self):
self.assertTrue(html_file_df.select("html").count() > 0)


@pytest.mark.fast
@pytest.mark.slow
class PartitionUrlTesSpec(unittest.TestCase):

def runTest(self):
Expand All @@ -122,8 +122,8 @@ def runTest(self):
pdf_df = Partition(content_type = "application/pdf").partition(self.html_directory)
pdf_file_df = Partition().partition(f"{self.html_directory}/text_3_pages.pdf")

self.assertTrue(pdf_df.select("text").count() > 0)
self.assertTrue(pdf_file_df.select("text").count() > 0)
self.assertTrue(pdf_df.select("pdf").count() > 0)
self.assertTrue(pdf_file_df.select("pdf").count() > 0)

@pytest.mark.fast
class PartitionTextInMemoryTesSpec(unittest.TestCase):
Expand All @@ -139,6 +139,5 @@ def setUp(self):

def runTest(self):
text_df = Partition(group_broken_paragraphs=True).partition_text(text = self.raw_text )
text_df.show(truncate=False)

self.assertTrue(text_df.select("txt").count() > 0)
2 changes: 0 additions & 2 deletions python/test/partition/partition_transformer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def runTest(self):
pipelineModel = pipeline.fit(emptyDataSet)

resultDf = pipelineModel.transform(self.testDataSet)
resultDf.show(truncate=False)

self.assertTrue(resultDf.select("partition").count() > 0)

Expand Down Expand Up @@ -108,6 +107,5 @@ def runTest(self):
pipelineModel = pipeline.fit(self.emptyDataSet)

resultDf = pipelineModel.transform(self.emptyDataSet)
resultDf.show(truncate=False)

self.assertTrue(resultDf.select("partition").count() >= 0)
93 changes: 93 additions & 0 deletions python/test/reader/reader2doc_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@

# Copyright 2017-2024 John Snow Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import pytest
import os

from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.reader.reader2doc import Reader2Doc
from test.util import SparkContextForTest
from pyspark.ml import Pipeline

@pytest.mark.fast
class Reader2DocTest(unittest.TestCase):

def setUp(self):
spark = SparkContextForTest.spark
self.empty_df = spark.createDataFrame([], "string").toDF("text")

def runTest(self):
reader2doc = Reader2Doc() \
.setContentType("text/html") \
.setContentPath(f"file:///{os.getcwd()}/../src/test/resources/reader/html/title-test.html") \
.setOutputCol("document")

pipeline = Pipeline(stages=[reader2doc])
model = pipeline.fit(self.empty_df)

result_df = model.transform(self.empty_df)

self.assertTrue(result_df.select("document").count() > 0)


@pytest.mark.fast
class Reader2DocTokenTest(unittest.TestCase):

def setUp(self):
spark = SparkContextForTest.spark
self.empty_df = spark.createDataFrame([], "string").toDF("text")

def runTest(self):
reader2doc = Reader2Doc() \
.setContentType("text/html") \
.setContentPath(f"file:///{os.getcwd()}/../src/test/resources/reader/html/example-div.html") \
.setOutputCol("document") \
.setTitleThreshold(18.5)

regex_tok = RegexTokenizer() \
.setInputCols(["document"]) \
.setOutputCol("regex_token")

pipeline = Pipeline(stages=[reader2doc, regex_tok])
model = pipeline.fit(self.empty_df)

result_df = model.transform(self.empty_df)

self.assertTrue(result_df.select("document").count() > 0)


@pytest.mark.fast
class Reader2DocPdfTest(unittest.TestCase):

def setUp(self):
spark = SparkContextForTest.spark
self.empty_df = spark.createDataFrame([], "string").toDF("text")

def runTest(self):
reader2doc = Reader2Doc() \
.setContentType("application/pdf") \
.setContentPath(f"file:///{os.getcwd()}/../src/test/resources/reader/pdf/pdf-title.pdf") \
.setOutputCol("document") \
.setTitleThreshold(18.5)

pipeline = Pipeline(stages=[reader2doc])
model = pipeline.fit(self.empty_df)

result_df = model.transform(self.empty_df)

self.assertTrue(result_df.select("document").count() > 0)
Loading
Loading