Skip to content

Commit 4c46f4b

Browse files
committed
Parse modes in JSON data source
1 parent 52b6a89 commit 4c46f4b

7 files changed

Lines changed: 96 additions & 35 deletions

File tree

python/pyspark/sql/readwriter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ def json(self, path, schema=None):
162162
(e.g. 00012)
163163
* ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \
164164
of all character using backslash quoting mechanism
165+
* ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
166+
during parsing. When fails to parse, ``PERMISSIVE`` mode sets ``null``, \
167+
``DROPMALFORMED`` drops the record and ``FAILFAST`` throws an exception.
165168
166169
>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
167170
>>> df1.dtypes

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
288288
* </li>
289289
* <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
290290
* (e.g. 00012)</li>
291+
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
292+
* during parsing. When fails to parse, `PERMISSIVE` mode sets `null`, `DROPMALFORMED` drops the
293+
* record and `FAILFAST` throws an exception.<li>
291294
*
292295
* @since 1.4.0
293296
*/
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources
19+
20+
private[datasources] object ParseModes {
21+
val PERMISSIVE_MODE = "PERMISSIVE"
22+
val DROP_MALFORMED_MODE = "DROPMALFORMED"
23+
val FAIL_FAST_MODE = "FAILFAST"
24+
25+
val DEFAULT = PERMISSIVE_MODE
26+
27+
def isValidMode(mode: String): Boolean = {
28+
mode.toUpperCase match {
29+
case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
30+
case _ => false
31+
}
32+
}
33+
34+
def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
35+
def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
36+
def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
37+
mode.toUpperCase == PERMISSIVE_MODE
38+
} else {
39+
true // We default to permissive is the mode string is not valid
40+
}
41+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv
2020
import java.nio.charset.StandardCharsets
2121

2222
import org.apache.spark.Logging
23-
import org.apache.spark.sql.execution.datasources.CompressionCodecs
23+
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
2424

2525
private[sql] class CSVOptions(
2626
@transient private val parameters: Map[String, String])
@@ -62,7 +62,7 @@ private[sql] class CSVOptions(
6262

6363
val delimiter = CSVTypeCast.toChar(
6464
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
65-
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
65+
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
6666
val charset = parameters.getOrElse("encoding",
6767
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
6868

@@ -101,26 +101,3 @@ private[sql] class CSVOptions(
101101

102102
val rowSeparator = "\n"
103103
}
104-
105-
private[csv] object ParseModes {
106-
val PERMISSIVE_MODE = "PERMISSIVE"
107-
val DROP_MALFORMED_MODE = "DROPMALFORMED"
108-
val FAIL_FAST_MODE = "FAILFAST"
109-
110-
val DEFAULT = PERMISSIVE_MODE
111-
112-
def isValidMode(mode: String): Boolean = {
113-
mode.toUpperCase match {
114-
case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
115-
case _ => false
116-
}
117-
}
118-
119-
def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
120-
def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
121-
def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
122-
mode.toUpperCase == PERMISSIVE_MODE
123-
} else {
124-
true // We default to permissive is the mode string is not valid
125-
}
126-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json
1919

2020
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2121

22-
import org.apache.spark.sql.execution.datasources.CompressionCodecs
22+
import org.apache.spark.Logging
23+
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
2324

2425
/**
2526
* Options for the JSON data source.
@@ -28,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.CompressionCodecs
2829
*/
2930
private[sql] class JSONOptions(
3031
@transient private val parameters: Map[String, String])
31-
extends Serializable {
32+
extends Logging with Serializable {
3233

3334
val samplingRatio =
3435
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -49,6 +50,16 @@ private[sql] class JSONOptions(
4950
val allowBackslashEscapingAnyCharacter =
5051
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
5152
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
53+
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
54+
55+
// Parse mode flags
56+
if (!ParseModes.isValidMode(parseMode)) {
57+
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
58+
}
59+
60+
val failFast = ParseModes.isFailFastMode(parseMode)
61+
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
62+
val permissive = ParseModes.isPermissiveMode(parseMode)
5263

5364
/** Sets config options on a Jackson [[JsonFactory]]. */
5465
def setJacksonOptions(factory: JsonFactory): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import com.fasterxml.jackson.core._
2525

2626
import org.apache.spark.rdd.RDD
27+
import org.apache.spark.Logging
2728
import org.apache.spark.sql.catalyst.InternalRow
2829
import org.apache.spark.sql.catalyst.expressions._
2930
import org.apache.spark.sql.catalyst.util._
@@ -34,7 +35,7 @@ import org.apache.spark.util.Utils
3435

3536
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
3637

37-
object JacksonParser {
38+
object JacksonParser extends Logging {
3839

3940
def parse(
4041
input: RDD[String],
@@ -244,13 +245,20 @@ object JacksonParser {
244245

245246
def failedRecord(record: String): Seq[InternalRow] = {
246247
// create a row even if no corrupt record column is present
247-
val row = new GenericMutableRow(schema.length)
248-
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
249-
require(schema(corruptIndex).dataType == StringType)
250-
row.update(corruptIndex, UTF8String.fromString(record))
248+
if (configOptions.failFast) {
249+
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
250+
}
251+
if (configOptions.dropMalformed) {
252+
logWarning(s"Dropping malformed line: $record")
253+
Nil
254+
} else {
255+
val row = new GenericMutableRow(schema.length)
256+
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
257+
require(schema(corruptIndex).dataType == StringType)
258+
row.update(corruptIndex, UTF8String.fromString(record))
259+
}
260+
Seq(row)
251261
}
252-
253-
Seq(row)
254262
}
255263

256264
val factory = new JsonFactory()

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.fs.{Path, PathFilter}
2828
import org.apache.hadoop.io.SequenceFile.CompressionType
2929
import org.apache.hadoop.io.compress.GzipCodec
30-
import org.scalactic.Tolerance._
3130

31+
import org.apache.spark.SparkException
3232
import org.apache.spark.rdd.RDD
3333
import org.apache.spark.sql._
3434
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -969,6 +969,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
969969
withTempTable("jsonTable") {
970970
val jsonDF = sqlContext.read.json(corruptRecords)
971971
jsonDF.registerTempTable("jsonTable")
972+
val jsonDFWithDropMalformed =
973+
sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords)
974+
jsonDFWithDropMalformed.registerTempTable("jsonTableWithDropMalformed")
975+
val exception = intercept[SparkException]{
976+
sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect()
977+
}
978+
assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {"))
979+
972980
val schema = StructType(
973981
StructField("_unparsed", StringType, true) ::
974982
StructField("a", StringType, true) ::
@@ -991,6 +999,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
991999
Row(null, null, null, "]") :: Nil
9921000
)
9931001

1002+
// Check if corrupt records are dropped.
1003+
checkAnswer(
1004+
sql(
1005+
"""
1006+
|SELECT a, b, c, _unparsed
1007+
|FROM jsonTableWithDropMalformed
1008+
""".stripMargin),
1009+
Row("str_a_4", "str_b_4", "str_c_4", null) :: Nil
1010+
)
1011+
9941012
checkAnswer(
9951013
sql(
9961014
"""

0 commit comments

Comments
 (0)