Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "77978702",
"metadata": {
"id": "77978702"
},
"source": [
"# DataFrameOptimizer Demo Notebook"
]
},
{
"cell_type": "markdown",
"id": "9zQvqLA1eoNQ",
"metadata": {
"id": "9zQvqLA1eoNQ"
},
"source": [
" This notebook showcases `DataFrameOptimizer` transformer which is intended to improve performance for Spark NLP pipelines or when preparing data for export. It allows partition tuning via `numPartitions` directly, or indirectly using `executorCores` and `numWorkers`.\n",
"\n",
"The DataFrame can also be persisted in a specified format\n",
" (`csv`, `json`, or `parquet`) with additional writer options."
]
},
{
"cell_type": "markdown",
"id": "ifVhTtS2gwhx",
"metadata": {
"id": "ifVhTtS2gwhx"
},
"source": [
"## Setup and Initialization\n",
"Let's keep in mind a few things before we start 😊\n",
"\n",
"This feature was introduces in Spark NLP 6.0.4. Please make sure you have upgraded to the latest Spark NLP release."
]
},
{
"cell_type": "markdown",
"id": "90a6044e",
"metadata": {},
"source": [
"- Let's install and setup Spark NLP in Google Colab. This part is pretty easy via our simple script"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "J29m0Q7papco",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "J29m0Q7papco",
"outputId": "d4576ab2-b65a-4d41-e0e2-ad2f9825e8f1"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Processing ./spark_nlp-6.0.3-py2.py3-none-any.whl\n",
"spark-nlp is already installed with the same version as the provided wheel. Use --force-reinstall to force an installation of the wheel.\n"
]
}
],
"source": [
"! wget -q http://setup.johnsnowlabs.com/colab.sh -O - | bash"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "46f1ea91",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "46f1ea91",
"outputId": "517cb0c9-52cd-4a25-9924-2ce8ba448cfd"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Apache Spark version: 3.5.1\n"
]
}
],
"source": [
"import sparknlp\n",
"# let's start Spark with Spark NLP\n",
"spark = sparknlp.start()\n",
"print(\"Apache Spark version: {}\".format(spark.version))"
]
},
{
"cell_type": "markdown",
"id": "oRbhoCrKg63t",
"metadata": {
"id": "oRbhoCrKg63t"
},
"source": [
"Use `DataFrameOptimizer` in a Spark NLP pipelines"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "0c340459",
"metadata": {
"id": "0c340459"
},
"outputs": [],
"source": [
"from sparknlp.annotator.dataframe_optimizer import DataFrameOptimizer\n",
"from sparknlp import DocumentAssembler\n",
"from sparknlp.annotator import SentenceDetector\n",
"from pyspark.ml import Pipeline\n",
"\n",
"test_df = spark.createDataFrame([(\"This is a test sentence. It contains multiple sentences.\",)], [\"text\"])"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "3cb661fd",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "3cb661fd",
"outputId": "b2683aaa-670d-4b92-b47e-e70082ddb24b"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Number of partitions: 4\n"
]
}
],
"source": [
"data_frame_optimizer = DataFrameOptimizer() \\\n",
" .setExecutorCores(2) \\\n",
" .setNumWorkers(2) \\\n",
" .setDoCache(True)\n",
"\n",
"document_assembler = DocumentAssembler() \\\n",
" .setInputCol(\"text\") \\\n",
" .setOutputCol(\"document\")\n",
"\n",
"sentence_detector = SentenceDetector() \\\n",
" .setInputCols([\"document\"]) \\\n",
" .setOutputCol(\"sentences\")\n",
"\n",
"pipeline = Pipeline(stages=[\n",
" data_frame_optimizer,\n",
" document_assembler,\n",
" sentence_detector\n",
" ])\n",
"\n",
"optimized_result_df = pipeline.fit(test_df).transform(test_df)\n",
"print(f\"Number of partitions: {optimized_result_df.rdd.getNumPartitions()}\")"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "Jgac_LmQgfWR",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "Jgac_LmQgfWR",
"outputId": "cea02e44-9e49-46dc-8d6d-4285c6e7a28d"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------------+--------------------+\n",
"| text| document| sentences|\n",
"+--------------------+--------------------+--------------------+\n",
"|This is a test se...|[{document, 0, 55...|[{document, 0, 23...|\n",
"+--------------------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"optimized_result_df.show()"
]
},
{
"cell_type": "markdown",
"id": "baUjiRDFgiiC",
"metadata": {
"id": "baUjiRDFgiiC"
},
"source": [
"Persisting data with DataFrameOptimizer"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "171aece5",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "171aece5",
"outputId": "aece12de-5e8f-4197-ea65-ff829e0a098a"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Data persisted to: /tmp/optimized_output\n"
]
}
],
"source": [
"persist_path = \"/tmp/optimized_output\"\n",
"optimizer_persist = DataFrameOptimizer() \\\n",
" .setNumPartitions(4) \\\n",
" .setDoCache(False) \\\n",
" .setPersistPath(persist_path) \\\n",
" .setPersistFormat(\"parquet\") \\\n",
" .setOutputOptions({\"compression\": \"snappy\"})\n",
"\n",
"persisted_df = optimizer_persist.transform(test_df)\n",
"print(f\"Data persisted to: {persist_path}\")"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "65dd1bda",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "65dd1bda",
"outputId": "5a345a56-43ef-4b9a-a44a-f59cc10d4c2a"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+\n",
"| text|\n",
"+--------------------+\n",
"|This is a test se...|\n",
"+--------------------+\n",
"\n"
]
}
],
"source": [
"restored_df = spark.read.parquet(persist_path)\n",
"restored_df.show(5)"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"- Let's install and setup Spark NLP in Google Colab\n",
"- This part is pretty easy via our simple script"
"- Let's install and setup Spark NLP in Google Colab. This part is pretty easy via our simple script"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,29 @@
"Support for **Partitioning** files was introduced in Spark NLP 6.0.1 \n",
"\n",
"Chunking support was added in Spark NLP 6.0.3\n",
"Please make sure you have upgraded to the latest Spark NLP release.\n",
"\n",
"Please make sure you have upgraded to the latest Spark NLP release."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"- Let's install and setup Spark NLP in Google Colab. This part is pretty easy via our simple script"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"! wget -q http://setup.johnsnowlabs.com/colab.sh -O - | bash"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For local files example we will download different files from Spark NLP Github repo:"
]
},
Expand Down
Loading