Skip to content

Commit 6588e00

Browse files
BryanCutlergatorsmile
authored andcommitted
[SPARK-22221][DOCS] Adding User Documentation for Arrow
## What changes were proposed in this pull request? Adding user facing documentation for working with Arrow in Spark Author: Bryan Cutler <cutlerb@gmail.com> Author: Li Jin <ice.xelloss@gmail.com> Author: hyukjinkwon <gurwls223@gmail.com> Closes #19575 from BryanCutler/arrow-user-docs-SPARK-2221. (cherry picked from commit 0d60b32) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
1 parent d68198d commit 6588e00

2 files changed

Lines changed: 262 additions & 1 deletion

File tree

docs/sql-programming-guide.md

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,138 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` a
16401640
You may run `./bin/spark-sql --help` for a complete list of all available
16411641
options.
16421642

1643+
# PySpark Usage Guide for Pandas with Apache Arrow
1644+
1645+
## Apache Arrow in Spark
1646+
1647+
Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
1648+
data between JVM and Python processes. This currently is most beneficial to Python users that
1649+
work with Pandas/NumPy data. Its usage is not automatic and might require some minor
1650+
changes to configuration or code to take full advantage and ensure compatibility. This guide will
1651+
give a high-level description of how to use Arrow in Spark and highlight any differences when
1652+
working with Arrow-enabled data.
1653+
1654+
### Ensure PyArrow Installed
1655+
1656+
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the
1657+
SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow
1658+
is installed and available on all cluster nodes. The current supported version is 0.8.0.
1659+
You can install using pip or conda from the conda-forge channel. See PyArrow
1660+
[installation](https://arrow.apache.org/docs/python/install.html) for details.
1661+
1662+
## Enabling for Conversion to/from Pandas
1663+
1664+
Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
1665+
using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with
1666+
`createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first set
1667+
the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.
1668+
1669+
<div class="codetabs">
1670+
<div data-lang="python" markdown="1">
1671+
{% include_example dataframe_with_arrow python/sql/arrow.py %}
1672+
</div>
1673+
</div>
1674+
1675+
Using the above optimizations with Arrow will produce the same results as when Arrow is not
1676+
enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the
1677+
DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
1678+
data types are currently supported and an error can be raised if a column has an unsupported type,
1679+
see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
1680+
Spark will fall back to create the DataFrame without Arrow.
1681+
1682+
## Pandas UDFs (a.k.a. Vectorized UDFs)
1683+
1684+
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
1685+
Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
1686+
or to wrap the function, no additional configuration is required. Currently, there are two types of
1687+
Pandas UDF: Scalar and Group Map.
1688+
1689+
### Scalar
1690+
1691+
Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such
1692+
as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and return
1693+
a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting
1694+
columns into batches and calling the function for each batch as a subset of the data, then
1695+
concatenating the results together.
1696+
1697+
The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns.
1698+
1699+
<div class="codetabs">
1700+
<div data-lang="python" markdown="1">
1701+
{% include_example scalar_pandas_udf python/sql/arrow.py %}
1702+
</div>
1703+
</div>
1704+
1705+
### Group Map
1706+
Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
1707+
Split-apply-combine consists of three steps:
1708+
* Split the data into groups by using `DataFrame.groupBy`.
1709+
* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
1710+
input data contains all the rows and columns for each group.
1711+
* Combine the results into a new `DataFrame`.
1712+
1713+
To use `groupBy().apply()`, the user needs to define the following:
1714+
* A Python function that defines the computation for each group.
1715+
* A `StructType` object or a string that defines the schema of the output `DataFrame`.
1716+
1717+
Note that all data for a group will be loaded into memory before the function is applied. This can
1718+
lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for
1719+
[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user
1720+
to ensure that the grouped data will fit into the available memory.
1721+
1722+
The following example shows how to use `groupby().apply()` to subtract the mean from each value in the group.
1723+
1724+
<div class="codetabs">
1725+
<div data-lang="python" markdown="1">
1726+
{% include_example group_map_pandas_udf python/sql/arrow.py %}
1727+
</div>
1728+
</div>
1729+
1730+
For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
1731+
[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
1732+
1733+
## Usage Notes
1734+
1735+
### Supported SQL Types
1736+
1737+
Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
1738+
`ArrayType` of `TimestampType`, and nested `StructType`.
1739+
1740+
### Setting Arrow Batch Size
1741+
1742+
Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to
1743+
high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow
1744+
record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
1745+
to an integer that will determine the maximum number of rows for each batch. The default value is
1746+
10,000 records per batch. If the number of columns is large, the value should be adjusted
1747+
accordingly. Using this limit, each data partition will be made into 1 or more record batches for
1748+
processing.
1749+
1750+
### Timestamp with Time Zone Semantics
1751+
1752+
Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
1753+
a specified time zone is converted as local time to UTC with microsecond resolution. When timestamp
1754+
data is exported or displayed in Spark, the session time zone is used to localize the timestamp
1755+
values. The session time zone is set with the configuration 'spark.sql.session.timeZone' and will
1756+
default to the JVM system local time zone if not set. Pandas uses a `datetime64` type with nanosecond
1757+
resolution, `datetime64[ns]`, with optional time zone on a per-column basis.
1758+
1759+
When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
1760+
and each column will be converted to the Spark session time zone then localized to that time
1761+
zone, which removes the time zone and displays values as local time. This will occur
1762+
when calling `toPandas()` or `pandas_udf` with timestamp columns.
1763+
1764+
When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds. This
1765+
occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp from a
1766+
`pandas_udf`. These conversions are done automatically to ensure Spark will have data in the
1767+
expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
1768+
values will be truncated.
1769+
1770+
Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects, which is
1771+
different than a Pandas timestamp. It is recommended to use Pandas time series functionality when
1772+
working with timestamps in `pandas_udf`s to get the best performance, see
1773+
[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details.
1774+
16431775
# Migration Guide
16441776

16451777
## Upgrading From Spark SQL 2.2 to 2.3
@@ -1788,7 +1920,7 @@ options.
17881920
Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
17891921
- In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
17901922
- In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
1791-
- In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
1923+
- In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans. In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
17921924
- Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
17931925
- Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString` to `true`.
17941926
- Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`.
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
A simple example demonstrating Arrow in Spark.
20+
Run with:
21+
./bin/spark-submit examples/src/main/python/sql/arrow.py
22+
"""
23+
24+
from __future__ import print_function
25+
26+
from pyspark.sql import SparkSession
27+
from pyspark.sql.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
28+
29+
require_minimum_pandas_version()
30+
require_minimum_pyarrow_version()
31+
32+
33+
def dataframe_with_arrow_example(spark):
34+
# $example on:dataframe_with_arrow$
35+
import numpy as np
36+
import pandas as pd
37+
38+
# Enable Arrow-based columnar data transfers
39+
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
40+
41+
# Generate a Pandas DataFrame
42+
pdf = pd.DataFrame(np.random.rand(100, 3))
43+
44+
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
45+
df = spark.createDataFrame(pdf)
46+
47+
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
48+
result_pdf = df.select("*").toPandas()
49+
# $example off:dataframe_with_arrow$
50+
print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
51+
52+
53+
def scalar_pandas_udf_example(spark):
54+
# $example on:scalar_pandas_udf$
55+
import pandas as pd
56+
57+
from pyspark.sql.functions import col, pandas_udf
58+
from pyspark.sql.types import LongType
59+
60+
# Declare the function and create the UDF
61+
def multiply_func(a, b):
62+
return a * b
63+
64+
multiply = pandas_udf(multiply_func, returnType=LongType())
65+
66+
# The function for a pandas_udf should be able to execute with local Pandas data
67+
x = pd.Series([1, 2, 3])
68+
print(multiply_func(x, x))
69+
# 0 1
70+
# 1 4
71+
# 2 9
72+
# dtype: int64
73+
74+
# Create a Spark DataFrame, 'spark' is an existing SparkSession
75+
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
76+
77+
# Execute function as a Spark vectorized UDF
78+
df.select(multiply(col("x"), col("x"))).show()
79+
# +-------------------+
80+
# |multiply_func(x, x)|
81+
# +-------------------+
82+
# | 1|
83+
# | 4|
84+
# | 9|
85+
# +-------------------+
86+
# $example off:scalar_pandas_udf$
87+
88+
89+
def group_map_pandas_udf_example(spark):
90+
# $example on:group_map_pandas_udf$
91+
from pyspark.sql.functions import pandas_udf, PandasUDFType
92+
93+
df = spark.createDataFrame(
94+
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
95+
("id", "v"))
96+
97+
@pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
98+
def substract_mean(pdf):
99+
# pdf is a pandas.DataFrame
100+
v = pdf.v
101+
return pdf.assign(v=v - v.mean())
102+
103+
df.groupby("id").apply(substract_mean).show()
104+
# +---+----+
105+
# | id| v|
106+
# +---+----+
107+
# | 1|-0.5|
108+
# | 1| 0.5|
109+
# | 2|-3.0|
110+
# | 2|-1.0|
111+
# | 2| 4.0|
112+
# +---+----+
113+
# $example off:group_map_pandas_udf$
114+
115+
116+
if __name__ == "__main__":
117+
spark = SparkSession \
118+
.builder \
119+
.appName("Python Arrow-in-Spark example") \
120+
.getOrCreate()
121+
122+
print("Running Pandas to/from conversion example")
123+
dataframe_with_arrow_example(spark)
124+
print("Running pandas_udf scalar example")
125+
scalar_pandas_udf_example(spark)
126+
print("Running pandas_udf group map example")
127+
group_map_pandas_udf_example(spark)
128+
129+
spark.stop()

0 commit comments

Comments
 (0)