Skip to content

Commit b3689ee

Browse files
committed
add recursiveFileLookup to python DataFrameReader
1 parent 9351e3e commit b3689ee

1 file changed

Lines changed: 17 additions & 8 deletions

File tree

python/pyspark/sql/readwriter.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
171171
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
172172
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
173173
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
174-
dropFieldIfAllNull=None, encoding=None, locale=None):
174+
dropFieldIfAllNull=None, encoding=None, locale=None, recursiveFileLookup=None):
175175
"""
176176
Loads JSON files and returns the results as a :class:`DataFrame`.
177177
@@ -247,6 +247,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
247247
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
248248
it uses the default value, ``en-US``. For instance, ``locale`` is used while
249249
parsing dates and timestamps.
250+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
251+
disables `partition discovery`_.
252+
253+
.. _partition discovery: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
250254
251255
>>> df1 = spark.read.json('python/test_support/sql/people.json')
252256
>>> df1.dtypes
@@ -266,7 +270,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
266270
timestampFormat=timestampFormat, multiLine=multiLine,
267271
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
268272
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
269-
locale=locale)
273+
locale=locale, recursiveFileLookup=recursiveFileLookup)
270274
if isinstance(path, basestring):
271275
path = [path]
272276
if type(path) == list:
@@ -300,7 +304,7 @@ def table(self, tableName):
300304
return self._df(self._jreader.table(tableName))
301305

302306
@since(1.4)
303-
def parquet(self, *paths):
307+
def parquet(self, *paths, recursiveFileLookup=None):
304308
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
305309
306310
You can set the following Parquet-specific option(s) for reading Parquet files:
@@ -312,11 +316,12 @@ def parquet(self, *paths):
312316
>>> df.dtypes
313317
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
314318
"""
319+
self._set_opts(recursiveFileLookup=recursiveFileLookup)
315320
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
316321

317322
@ignore_unicode_prefix
318323
@since(1.6)
319-
def text(self, paths, wholetext=False, lineSep=None):
324+
def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
320325
"""
321326
Loads text files and returns a :class:`DataFrame` whose schema starts with a
322327
string column named "value", and followed by partitioned columns if there
@@ -337,7 +342,8 @@ def text(self, paths, wholetext=False, lineSep=None):
337342
>>> df.collect()
338343
[Row(value=u'hello\\nthis')]
339344
"""
340-
self._set_opts(wholetext=wholetext, lineSep=lineSep)
345+
self._set_opts(wholetext=wholetext, lineSep=lineSep,
346+
recursiveFileLookup=recursiveFileLookup)
341347
if isinstance(paths, basestring):
342348
paths = [paths]
343349
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@@ -349,7 +355,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
349355
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
350356
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
351357
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
352-
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
358+
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
359+
recursiveFileLookup=None):
353360
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
354361
355362
This function will go through the input once to determine the input schema if
@@ -476,7 +483,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
476483
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
477484
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
478485
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
479-
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep)
486+
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
487+
recursiveFileLookup=recursiveFileLookup)
480488
if isinstance(path, basestring):
481489
path = [path]
482490
if type(path) == list:
@@ -504,13 +512,14 @@ def func(iterator):
504512
raise TypeError("path can be only string, list or RDD")
505513

506514
@since(1.5)
507-
def orc(self, path):
515+
def orc(self, path, recursiveFileLookup=None):
508516
"""Loads ORC files, returning the result as a :class:`DataFrame`.
509517
510518
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
511519
>>> df.dtypes
512520
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
513521
"""
522+
self._set_opts(recursiveFileLookup=recursiveFileLookup)
514523
if isinstance(path, basestring):
515524
path = [path]
516525
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))

0 commit comments

Comments
 (0)