Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,20 @@ def table(self, tableName):
return self._df(self._jreader.table(tableName))

@since(1.4)
def parquet(self, *paths):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side question for you @HyukjinKwon: The *paths parameter bothers me a bit. None of the other load methods use this pattern, and the streaming version of parquet() doesn't use it either. How would you feel about a separate PR changing this to paths? I suppose the 3.0 release would be our chance to do it, since it changes the API.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this *paths allows a consistent way with Scala and Java side (def parquet(paths: String*)). So I think technically it's more correct to support *paths.

In case of streaming, it's also matched to Scala / Java side def parquet(path: String).

Maybe we should introduce keyword-only argument (as you said earlier somewhere) after completely dropping Python 2 in Spark 3.1. ... I am not sure about this yet.

"""Loads Parquet files, returning the result as a :class:`DataFrame`.
def parquet(self, *paths, **options):
"""
Loads Parquet files, returning the result as a :class:`DataFrame`.

You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.
:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.

>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
mergeSchema = options.get('mergeSchema', None)
self._set_opts(mergeSchema=mergeSchema)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

@ignore_unicode_prefix
Expand Down
15 changes: 8 additions & 7 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,22 +526,23 @@ def orc(self, path):
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.
def parquet(self, path, mergeSchema=None):
"""
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

.. note:: Evolving.

:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.

>>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming
True
>>> parquet_sdf.schema == sdf_schema
True
"""
self._set_opts(mergeSchema=mergeSchema)
if isinstance(path, basestring):
return self._df(self._jreader.parquet(path))
else:
Expand Down