Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ private[csv] object CSVTypeCast {
Try(datum.toDouble)
.getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue())
}
case _: BooleanType if datum == options.nullValue && nullable => null
case _: BooleanType => datum.toBoolean
case dt: DecimalType =>
if (datum == options.nullValue && nullable) {
Expand All @@ -231,6 +232,7 @@ private[csv] object CSVTypeCast {
val value = new BigDecimal(datum.replaceAll(",", ""))
Decimal(value, dt.precision, dt.scale)
}
case _: TimestampType if datum == options.nullValue && nullable => null
case _: TimestampType if options.dateFormat != null =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Expand All @@ -239,10 +241,12 @@ private[csv] object CSVTypeCast {
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
DateTimeUtils.stringToTime(datum).getTime * 1000L
case _: DateType if datum == options.nullValue && nullable => null
case _: DateType if options.dateFormat != null =>
DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)
case _: DateType =>
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
case _: StringType if datum == options.nullValue && nullable => null
case _: StringType => UTF8String.fromString(datum)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}

test("nullable fields with user defined null value of \"null\"") {

// year,make,model,comment,blank
val dataSchema = StructType(List(
StructField("year", IntegerType, nullable = true),
Expand All @@ -447,7 +446,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {

verifyCars(cars, withHeader = true, checkValues = false)
val results = cars.collect()
assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null"))
assert(results(0).toSeq === Array(2012, "Tesla", "S", null, null))
Copy link
Member Author

@HyukjinKwon HyukjinKwon May 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being tested against the data as below:

year,make,model,comment,blank
"2012","Tesla","S",null,

1997,Ford,E350,"Go get one now they are going fast",
null,Chevy,Volt

Since the header is year,make,model,comment,blank, this should produce the values 2012,Tesla,S,null,null because nullValue is set to "null".

assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources

import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.types._

class CSVHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = "csv"

override val extraReadOptions: Map[String, String] =
Map("header" -> "true", "inferSchema" -> "true")

override val extraWriteOptions: Map[String, String] = Map("header" -> "true")

override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: NullType => false
case _: BinaryType => false
case _: CalendarIntervalType => false
case _: ArrayType => false
case _: MapType => false
case _: StructType => false
// Currently, this writes `DateType` and `TimestampType` as a long value.
// Since `dateFormat` is not yet supported for writing, this is disabled for now.
case _: DateType => false
case _: TimestampType => false
case _: UserDefinedType[_] => false
case _ => true
}

test("save()/load() - partitioned table - simple queries - partition columns in data") {
withTempDir { file =>
val basePath = new Path(file.getCanonicalPath)
val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
val qualifiedBasePath = fs.makeQualified(basePath)

for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
val header = Seq("a,b")
val data = (1 to 3).map(i => s"""$i,val_$i""")
sparkContext
.parallelize(header ++ data)
.saveAsTextFile(partitionDir.toString)
}

val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))

checkQueries(
hiveContext.read.format(dataSourceName)
.option("dataSchema", dataSchemaWithPartition.json)
.option("inferSchema", "true")
.option("header", "true")
.load(file.getCanonicalPath))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes

val dataSourceName: String

// This options below will be applied for the tests for reading in `HadoopFsRelationTest`.
val extraReadOptions = Map.empty[String, String]

// This options below will be applied for the tests for writing in `HadoopFsRelationTest`.
val extraWriteOptions = Map.empty[String, String]

protected def supportsDataType(dataType: DataType): Boolean = true

val dataSchema =
Expand Down Expand Up @@ -168,26 +174,44 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes

test("save()/load() - non-partitioned table - Overwrite") {
withTempPath { file =>
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)
testDF.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.options(extraWriteOptions)
.save(file.getCanonicalPath)
testDF.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.options(extraWriteOptions)
.save(file.getCanonicalPath)

checkAnswer(
sqlContext.read.format(dataSourceName)
.option("path", file.getCanonicalPath)
.option("dataSchema", dataSchema.json)
.options(extraReadOptions)
.load(),
testDF.collect())
testDF)
}
}

test("save()/load() - non-partitioned table - Append") {
withTempPath { file =>
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)
testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath)
testDF.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.options(extraWriteOptions)
.save(file.getCanonicalPath)
testDF.write
.mode(SaveMode.Append)
.format(dataSourceName)
.options(extraWriteOptions)
.save(file.getCanonicalPath)

checkAnswer(
sqlContext.read.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.options(extraReadOptions)
.load(file.getCanonicalPath).orderBy("a"),
testDF.union(testDF).orderBy("a").collect())
}
Expand Down Expand Up @@ -215,13 +239,15 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
withTempPath { file =>
partitionedTestDF.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.ErrorIfExists)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

checkQueries(
sqlContext.read.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.options(extraReadOptions)
.load(file.getCanonicalPath))
}
}
Expand All @@ -230,19 +256,22 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
withTempPath { file =>
partitionedTestDF.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

partitionedTestDF.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

checkAnswer(
sqlContext.read.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.options(extraReadOptions)
.load(file.getCanonicalPath),
partitionedTestDF.collect())
}
Expand All @@ -252,19 +281,22 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
withTempPath { file =>
partitionedTestDF.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

partitionedTestDF.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Append)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

checkAnswer(
sqlContext.read.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.options(extraReadOptions)
.load(file.getCanonicalPath),
partitionedTestDF.union(partitionedTestDF).collect())
}
Expand All @@ -274,19 +306,22 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
withTempPath { file =>
partitionedTestDF1.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

partitionedTestDF2.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Append)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)

checkAnswer(
sqlContext.read.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.options(extraReadOptions)
.load(file.getCanonicalPath),
partitionedTestDF.collect())
}
Expand Down Expand Up @@ -490,6 +525,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
withTempPath { file =>
partitionedTestDF.write
.format(dataSourceName)
.options(extraWriteOptions)
.mode(SaveMode.Overwrite)
.partitionBy("p1", "p2")
.save(file.getCanonicalPath)
Expand All @@ -498,6 +534,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.option("basePath", file.getCanonicalPath)
.options(extraReadOptions)
.load(s"${file.getCanonicalPath}/p1=*/p2=???")

val expectedPaths = Set(
Expand Down Expand Up @@ -612,23 +649,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes

val df = sqlContext.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
df.write
.mode("append")
.format(dataSourceName)
.options(extraWriteOptions)
.save(dir.getCanonicalPath)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
// with file format and AlwaysFailOutputCommitter will not be used.
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
df.write
.mode("append")
.format(dataSourceName)
.options(extraWriteOptions)
.save(dir.getCanonicalPath)
checkAnswer(
sqlContext.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.options(extraOptions)
.options(extraOptions ++ extraReadOptions)
.load(dir.getCanonicalPath),
df.union(df))

// This will fail because AlwaysFailOutputCommitter is used when we do append.
intercept[Exception] {
df.write.mode("overwrite")
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
.options(extraOptions ++ extraWriteOptions)
.format(dataSourceName)
.save(dir.getCanonicalPath)
}
}
withTempPath { dir =>
Expand All @@ -637,7 +684,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
// and there is no existing data.
intercept[Exception] {
df.write.mode("append")
.options(extraOptions)
.options(extraOptions ++ extraWriteOptions)
.format(dataSourceName)
.save(dir.getCanonicalPath)
}
Expand Down