Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,19 @@ class UnivocityParser(
// dates and timestamps.
// For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions.
private val enableParsingFallbackForTimestampType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
options.enableDateTimeParsingFallback
.orElse(SQLConf.get.csvEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
private val enableParsingFallbackForDateType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}
options.enableDateTimeParsingFallback
.orElse(SQLConf.get.csvEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ class JacksonParser(
// dates and timestamps.
// For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions.
private val enableParsingFallbackForTimestampType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
options.enableDateTimeParsingFallback
.orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.timestampFormatInRead.isEmpty
}
private val enableParsingFallbackForDateType =
options.enableDateTimeParsingFallback.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}
options.enableDateTimeParsingFallback
.orElse(SQLConf.get.jsonEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
}

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3520,6 +3520,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK =
buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback")
.internal()
.doc("When true, enable legacy date/time parsing fallback in CSV")
.version("3.4.0")
.booleanConf
.createOptional

val LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK =
buildConf("spark.sql.legacy.json.enableDateTimeParsingFallback")
.internal()
.doc("When true, enable legacy date/time parsing fallback in JSON")
.version("3.4.0")
.booleanConf
.createOptional

val ADD_PARTITION_BATCH_SIZE =
buildConf("spark.sql.addPartitionInBatch.size")
.internal()
Expand Down Expand Up @@ -4621,6 +4637,12 @@ class SQLConf extends Serializable with Logging {

def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED)

def jsonEnableDateTimeParsingFallback: Option[Boolean] =
Copy link
Contributor Author

@sadikovi sadikovi Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided not to add "legacy" prefix in the method as it would make the method name very long 🙂.

getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK)

def csvEnableDateTimeParsingFallback: Option[Boolean] =
getConf(LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK)

def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)

def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2949,6 +2949,38 @@ abstract class CSVSuite
)
}
}

test("SPARK-40215: enable parsing fallback for CSV in CORRECTED mode with a SQL config") {
withTempPath { path =>
Seq("2020-01-01,2020-01-01").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)

for (fallbackEnabled <- Seq(true, false)) {
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED",
SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") {
val df = spark.read
.schema("date date, ts timestamp")
.option("dateFormat", "invalid")
.option("timestampFormat", "invalid")
.csv(path.getAbsolutePath)

if (fallbackEnabled) {
checkAnswer(
df,
Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")))
)
} else {
checkAnswer(
df,
Seq(Row(null, null))
)
}
}
}
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3322,6 +3322,38 @@ abstract class JsonSuite
)
}
}

test("SPARK-40215: enable parsing fallback for JSON in CORRECTED mode with a SQL config") {
withTempPath { path =>
Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)

for (fallbackEnabled <- Seq(true, false)) {
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED",
SQLConf.LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") {
val df = spark.read
.schema("date date, ts timestamp")
.option("dateFormat", "invalid")
.option("timestampFormat", "invalid")
.json(path.getAbsolutePath)

if (fallbackEnabled) {
checkAnswer(
df,
Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")))
)
} else {
checkAnswer(
df,
Seq(Row(null, null))
)
}
}
}
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down