Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions src/main/scala/com/databricks/spark/redshift/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,6 @@ private[redshift] object Parameters {
*/
def jdbcDriver: Option[String] = parameters.get("jdbcdriver")

/**
* If true, when writing, replace any existing data. When false, append to the table instead.
* Note that the table schema will need to be compatible with whatever you have in the DataFrame
* you're writing. spark-redshift makes no attempt to enforce that - you'll just see Redshift
* errors if they don't match.
*
* Defaults to false.
*/
@deprecated("Use SaveMode instead", "0.5.0")
def overwrite: Boolean = parameters("overwrite").toBoolean

/**
* Set the Redshift table distribution style, which can be one of: EVEN, KEY or ALL. If you set
* it to KEY, you'll also need to use the distkey parameter to set the distribution key.
Expand Down
60 changes: 0 additions & 60 deletions src/main/scala/com/databricks/spark/redshift/SchemaParser.scala

This file was deleted.

48 changes: 5 additions & 43 deletions src/main/scala/com/databricks/spark/redshift/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package com.databricks.spark

import com.amazonaws.services.s3.AmazonS3Client
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

package object redshift {

Expand All @@ -45,50 +44,13 @@ package object redshift {
}

/**
* Reads a table unload from Redshift with its schema in format "name0 type0 name1 type1 ...".
* Reads a table unload from Redshift with its schema.
*/
@deprecated("Use data sources API or perform string -> data type casts yourself", "0.5.0")
def redshiftFile(path: String, schema: String): DataFrame = {
val structType = SchemaParser.parseSchema(schema)
val casts = structType.fields.map { field =>
def redshiftFile(path: String, schema: StructType): DataFrame = {
val casts = schema.fields.map { field =>
col(field.name).cast(field.dataType).as(field.name)
}
redshiftFile(path, structType.fieldNames).select(casts: _*)
}

/**
* Read a Redshift table into a DataFrame, using S3 for data transfer and JDBC
* to control Redshift and resolve the schema
*/
@deprecated("Use sqlContext.read()", "0.5.0")
def redshiftTable(parameters: Map[String, String]): DataFrame = {
val params = Parameters.mergeParameters(parameters)
sqlContext.baseRelationToDataFrame(
RedshiftRelation(
DefaultJDBCWrapper, creds => new AmazonS3Client(creds), params, None)(sqlContext))
}
}

/**
* Add write functionality to DataFrame
*/
@deprecated("Use DataFrame.write()", "0.5.0")
implicit class RedshiftDataFrame(dataFrame: DataFrame) {

/**
* Load the DataFrame into a Redshift database table. By default, this will append to the
* specified table. If the `overwrite` parameter is set to `true` then this will drop the
* existing table and re-create it with the contents of this DataFrame.
*/
@deprecated("Use DataFrame.write()", "0.5.0")
def saveAsRedshiftTable(parameters: Map[String, String]): Unit = {
val params = Parameters.mergeParameters(parameters)
val saveMode = if (params.overwrite) {
SaveMode.Overwrite
} else {
SaveMode.Append
}
DefaultRedshiftWriter.saveToRedshift(dataFrame.sqlContext, dataFrame, saveMode, params)
redshiftFile(path, schema.fieldNames).select(casts: _*)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,22 @@ class RedshiftInputFormatSuite extends FunSuite with BeforeAndAfterAll {
val escaped = escape(testRecords.map(_.map(_.toString)), DEFAULT_DELIMITER)
writeToFile(escaped, new File(dir, "part-00000"))

val conf = new Configuration
conf.setLong(KEY_BLOCK_SIZE, 4)

val sqlContext = new SQLContext(sc)

val srdd = sqlContext.redshiftFile(
dir.toString,
"name varchar(10) state text id integer score float big_score numeric(4, 0) " +
"some_long bigint")
val expectedSchema = StructType(Seq(
StructField("name", StringType, nullable = true),
StructField("state", StringType, nullable = true),
StructField("id", IntegerType, nullable = true),
StructField("score", DoubleType, nullable = true),
StructField("big_score", LongType, nullable = true),
StructField("some_long", LongType, nullable = true)))
assert(srdd.schema === expectedSchema)
val parsed = srdd.rdd.map {
case Row(name: String, state: String, id: Int, score: Double,
bigScore: Long, someLong: Long) =>
Seq(name, state, id, score, bigScore, someLong)

val df = sqlContext.redshiftFile(dir.toString, expectedSchema)
assert(df.schema === expectedSchema)

val parsed = df.rdd.map {
case Row(
name: String, state: String, id: Int, score: Double, bigScore: Long, someLong: Long
) => Seq(name, state, id, score, bigScore, someLong)
}.collect().toSet

assert(parsed === testRecords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ class RedshiftSourceSuite
"query" -> "select * from test_table")

val e1 = intercept[IllegalArgumentException] {
expectedDataDF.saveAsRedshiftTable(invalidParams)
expectedDataDF.write.format("com.databricks.spark.redshift").options(invalidParams).save()
}
assert(e1.getMessage.contains("dbtable"))
}
Expand All @@ -522,12 +522,12 @@ class RedshiftSourceSuite
val invalidParams = Map("dbtable" -> "foo") // missing tempdir and url

val e1 = intercept[IllegalArgumentException] {
expectedDataDF.saveAsRedshiftTable(invalidParams)
expectedDataDF.write.format("com.databricks.spark.redshift").options(invalidParams).save()
}
assert(e1.getMessage.contains("tempdir"))

val e2 = intercept[IllegalArgumentException] {
testSqlContext.redshiftTable(invalidParams)
expectedDataDF.write.format("com.databricks.spark.redshift").options(invalidParams).save()
}
assert(e2.getMessage.contains("tempdir"))
}
Expand All @@ -539,7 +539,11 @@ class RedshiftSourceSuite
test("Saves throw error message if S3 Block FileSystem would be used") {
val params = defaultParams + ("tempdir" -> defaultParams("tempdir").replace("s3n", "s3"))
val e = intercept[IllegalArgumentException] {
expectedDataDF.saveAsRedshiftTable(params)
expectedDataDF.write
.format("com.databricks.spark.redshift")
.mode("append")
.options(params)
.save()
}
assert(e.getMessage.contains("Block FileSystem"))
}
Expand Down