Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private[sql] case class CSVParameters(parameters: Map[String, String]) extends L

val nullValue = parameters.getOrElse("nullValue", "")

val codec = parameters.getOrElse("compression", parameters.getOrElse("codec", null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this one i'd name the internally name compression or compressionCodec since codec can mean a lot of different things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other thing is that i'd create short-form names for the common options, e.g. "gzip" should become GzipCodec. You'd need to look into what the commonly supported formats are and come up with their short names. We should also make sure this is case insensitive.


val maxColumns = 20480

val maxCharsPerColumn = 100000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.RecordWriter
Expand Down Expand Up @@ -98,6 +99,15 @@ private[csv] class CSVRelation(
}

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = job.getConfiguration
Option(params.codec).foreach { codec =>
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
}

new CSVOutputWriterFactory(params)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File
import java.nio.charset.UnsupportedCharsetException
import java.sql.Timestamp

import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
Expand Down Expand Up @@ -338,4 +340,29 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}

test("save csv with compression codec option") {
withTempDir { dir =>
val csvDir = new File(dir, "csv").getCanonicalPath
val cars = sqlContext.read
.format("csv")
.option("header", "true")
.load(testFile(carsFile))

cars.coalesce(1).write
.format("csv")
.option("header", "true")
.option("compression", classOf[GzipCodec].getCanonicalName)
.save(csvDir)

val compressedFiles = new File(csvDir).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(".gz")))

val carsCopy = sqlContext.read
.format("csv")
.option("header", "true")
.load(csvDir)

verifyCars(carsCopy, withHeader = true)
}
}
}