Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
TYPE_CHECKING,
Mapping,
)

import pandas
import pyarrow as pa
import pyspark.sql.connect.proto as proto
from pyspark.sql.connect.column import (
Column,
Expand Down Expand Up @@ -177,6 +178,37 @@ def _repr_html_(self) -> str:
"""


class LocalRelation(LogicalPlan):
"""Creates a LocalRelation plan object based on a Pandas DataFrame."""

def __init__(self, pdf: "pandas.DataFrame") -> None:
super().__init__(None)
self._pdf = pdf

def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._pdf is not None
Copy link
Contributor

@amaliujia amaliujia Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is this a bit redundant though that plan.py is internal API, the constructor does not accepts Optional pandas dataframe and we have mypy to do type checking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. It makes sense to move the assertion into the session.

As an FYI, all of the mypy checks are really just for the code that we write. During runtime, the user can pass whatever they want and we should make sure that we have proper checks for it. But since plan is internal API it makes a lot of sense to have the checking on the public API instead.


sink = pa.BufferOutputStream()
table = pa.Table.from_pandas(self._pdf)
with pa.ipc.new_stream(sink, table.schema) as writer:
Copy link
Contributor

@amaliujia amaliujia Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar here so a question:

any possible that an empty panda dataframe are used here (e.g. has schema but no data). If so maybe have a test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test for that, thanks for the proposal!

for b in table.to_batches():
writer.write_batch(b)

plan = proto.Relation()
plan.local_relation.data = sink.getvalue().to_pybytes()
return plan

def print(self, indent: int = 0) -> str:
return f"{' ' * indent}<LocalRelation>\n"

def _repr_html_(self) -> str:
return """
<ul>
<li>LocalRelation</li>
</ul>
"""


class ShowString(LogicalPlan):
def __init__(
self, child: Optional["LogicalPlan"], numRows: int, truncate: int, vertical: bool
Expand Down
27 changes: 27 additions & 0 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

from threading import RLock
from typing import Optional, Any, Union, Dict, cast, overload
import pandas as pd

import pyspark.sql.types
from pyspark.sql.connect.client import SparkConnectClient
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.plan import SQL, Range
from pyspark.sql.connect.readwriter import DataFrameReader
from pyspark.sql.utils import to_str
from . import plan
from ._typing import OptionalPrimitiveType


Expand Down Expand Up @@ -205,6 +207,31 @@ def __init__(self, connectionString: str, userId: Optional[str] = None):
# Create the reader
self.read = DataFrameReader(self)

def createDataFrame(self, data: "pd.DataFrame") -> "DataFrame":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the implementation here isn't matched to what we have in createDataFrame(pandas).

By default, the Arrow message conversion (more specifically in https://github.com/apache/spark/pull/38659/files#diff-d630cc4be6c65a3c3f7d6dbfe990f99ba992ccc26d9c3aaf6cfe46e163cb7389R514-R521) have to happen in RDD so we can parallelize this.

For a bit of history, PySpark added the initial version with RDD first, and added this local relation as an optimization for small dataset (see also #36683) later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the current approach but the main problem here is that 1. we can't stream the input, 2. it will have the size limit (likely 4KB). cc @hvanhovell FYI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is impossible to match the implementation because in Pyspark to parallelize a first serialization is already happening to pass the input DF to the executors.

In our case to even send the data to spark we have to serialize it.

That said you're right that this currently does not support streaming of local data to the client. But the limit is not 4kb but probably whatever the max message size of GRPC is so in the megabytes.

I think we need to add the client side streaming APIs at some point but I'd like to defer that for a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a large pd.dataframe, I guess we can optimize it in this way in the future: split it into several batches, and create a localRelation for each batch, and finally Union them.

"""
Creates a :class:`DataFrame` from a :class:`pandas.DataFrame`.

.. versionadded:: 3.4.0


Parameters
----------
data : :class:`pandas.DataFrame`

Returns
-------
:class:`DataFrame`

Examples
--------
>>> import pandas
>>> pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> self.connect.createDataFrame(pdf).collect()
[Row(a=1, b='a'), Row(a=2, b='b'), Row(a=3, b='c')]

"""
return DataFrame.withPlan(plan.LocalRelation(data), self)

@property
def client(self) -> "SparkConnectClient":
"""
Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ def conv_udf(x) -> str:
result = df.select(u(df.id)).toPandas()
self.assertIsNotNone(result)

def test_with_local_data(self):
"""SPARK-41114: Test creating a dataframe using local data"""
pdf = pandas.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})
df = self.connect.createDataFrame(pdf)
rows = df.filter(df.a == lit(3)).collect()
self.assertTrue(len(rows) == 1)
self.assertEqual(rows[0][0], 3)
self.assertEqual(rows[0][1], "c")

def test_simple_explain_string(self):
df = self.connect.read.table(self.tbl_name).limit(10)
result = df.explain()
Expand Down