Skip to content

Commit 25e5c83

Browse files
committed
rebase to resolve conflict
1 parent b90bf52 commit 25e5c83

11 files changed

Lines changed: 679 additions & 20 deletions

File tree

docs/sql-programming-guide.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,8 @@ file directly with SQL.
518518
Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if
519519
present. It is important to realize that these save modes do not utilize any locking and are not
520520
atomic. Additionally, when performing an `Overwrite`, the data will be deleted before writing out the
521-
new data.
521+
new data. When performing an `Append`, there is an option to enable the UPSERT feature for JDBC datasources, currently
522+
only MySQL and Postgres.
522523

523524
<table class="table">
524525
<tr><th>Scala/Java</th><th>Any Language</th><th>Meaning</th></tr>
@@ -1230,6 +1231,24 @@ the following case-insensitive options:
12301231
The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: <code>"name CHAR(64), comments VARCHAR(1024)")</code>. The specified types should be valid spark sql data types. This option applies only to writing.
12311232
</td>
12321233
</tr>
1234+
1235+
<tr>
1236+
<td><code>upsert, upsertConditionColumn, upsertUpdateColumn </code></td>
1237+
<td>
1238+
These options are JDBC writer related options. They describe how to
1239+
use UPSERT feature for different JDBC dialects. Right now, this feature implemented in MySQL, Postgres
1240+
JDBC dialects. The upsert option is applicable only when <code>SaveMode.Append</code> is enabled,
1241+
in Overwrite mode, the existing table will be dropped or truncated first, including the unique constraints
1242+
or primary key, before the insertion. So, UPSERT scenario is not applicable.
1243+
<code>upsertConditionColumn</code> are columns used to match existing rows. They are usually unique constraint/primary
1244+
key columns. This option is required by PostgreSQL datasource. This option is not applicable for MySQL datasource,
1245+
since the datasource will automatically use any existing unique constraint.
1246+
<code>upsertUpdateColumn</code> are columns updated with the input data set when existing rows are matched. It is required
1247+
by MySQL datasource.
1248+
Be aware that if the input data set has duplicate rows, the upsert operation is
1249+
non-deterministic, it is documented at the [upsert(merge) wiki:](https://en.wikipedia.org/wiki/Merge_(SQL)).
1250+
</td>
1251+
</tr>
12331252
</table>
12341253

12351254
<div class="codetabs">

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ package org.apache.spark.sql.jdbc
2020
import java.math.BigDecimal
2121
import java.sql.{Connection, Date, Timestamp}
2222
import java.util.Properties
23+
import org.apache.spark.sql.SaveMode
2324

2425
import org.apache.spark.tags.DockerTest
2526

2627
@DockerTest
2728
class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
29+
import testImplicits._
2830
override val db = new DatabaseOnDocker {
2931
override val imageName = "mysql:5.7.9"
3032
override val env = Map(
@@ -61,6 +63,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
6163
).executeUpdate()
6264
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " +
6365
"'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
66+
67+
conn.prepareStatement("CREATE TABLE upsertT0 (c1 INTEGER primary key, c2 INTEGER, c3 INTEGER)")
68+
.executeUpdate()
69+
conn.prepareStatement("INSERT INTO upsertT0 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
70+
.executeUpdate()
71+
conn.prepareStatement("CREATE TABLE upsertT1 (c1 INTEGER primary key, c2 INTEGER, c3 INTEGER)")
72+
.executeUpdate()
73+
conn.prepareStatement("INSERT INTO upsertT1 VALUES (1, 2, 3), (2, 3, 4)")
74+
.executeUpdate()
75+
conn.prepareStatement("CREATE TABLE upsertT2 (c1 INTEGER, c2 INTEGER, c3 INTEGER, " +
76+
"primary key(c1, c2))").executeUpdate()
77+
conn.prepareStatement("INSERT INTO upsertT2 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
78+
.executeUpdate()
6479
}
6580

6681
test("Basic test") {
@@ -152,4 +167,104 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
152167
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
153168
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
154169
}
170+
171+
test("upsert with Append without existing table") {
172+
val df1 = Seq((1, 3), (2, 5)).toDF("c1", "c2")
173+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c1")
174+
.jdbc(jdbcUrl, "upsertT", new Properties)
175+
val df2 = spark.read.jdbc(jdbcUrl, "upsertT", new Properties)
176+
assert(df2.count() == 2)
177+
assert(df2.filter("C1=1").collect.head.get(1) == 3)
178+
179+
// table upsertT create without primary key or unique constraints, it will do the insert
180+
val df3 = Seq((1, 4)).toDF("c1", "c2")
181+
df3.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c1")
182+
.jdbc(jdbcUrl, "upsertT", new Properties)
183+
assert(spark.read.jdbc(jdbcUrl, "upsertT", new Properties).filter("c1=1").count() == 2)
184+
}
185+
186+
test("Upsert and OverWrite mode") {
187+
//existing table has these rows
188+
//(1, 2, 3), (2, 3, 4), (3, 4, 5)
189+
val df1 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
190+
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
191+
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
192+
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
193+
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
194+
val df2 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
195+
// it will do the Overwrite, not upsert
196+
df2.write.mode(SaveMode.Overwrite)
197+
.option("upsert", true).option("upsertUpdateColumn", "c2, c3")
198+
.jdbc(jdbcUrl, "upsertT0", new Properties)
199+
val df3 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
200+
assert(df3.filter("c1=1").collect.head.getInt(1) == 3)
201+
assert(df3.filter("c1=1").collect.head.getInt(2) == 6)
202+
assert(df3.filter("c1=2").collect.head.getInt(1) == 5)
203+
assert(df3.filter("c1=2").collect.head.getInt(2) == 6)
204+
assert(df3.filter("c1=3").collect.size == 0)
205+
}
206+
207+
test("upsert with Append and negative option values") {
208+
val df1 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
209+
val m = intercept[java.sql.SQLException] {
210+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "C11")
211+
.jdbc(jdbcUrl, "upsertT1", new Properties)
212+
}.getMessage
213+
assert(m.contains("column C11 not found"))
214+
215+
val n = intercept[java.sql.SQLException] {
216+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "C11")
217+
.jdbc(jdbcUrl, "upsertT1", new Properties)
218+
}.getMessage
219+
assert(n.contains("column C11 not found"))
220+
}
221+
222+
test("Upsert and Append mode -- data matching one column") {
223+
//existing table has these rows
224+
//(1, 2, 3), (2, 3, 4)
225+
val df1 = spark.read.jdbc(jdbcUrl, "upsertT1", new Properties())
226+
assert(df1.count() == 2)
227+
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
228+
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
229+
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
230+
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
231+
val df2 = Seq((1, 4, 7), (2, 6, 8)).toDF("c1", "c2", "c3")
232+
df2.write.mode(SaveMode.Append)
233+
.option("upsert", true).option("upsertUpdateColumn", "c2, c3")
234+
.jdbc(jdbcUrl, "upsertT1", new Properties)
235+
val df3 = spark.read.jdbc(jdbcUrl, "upsertT1", new Properties())
236+
assert(df3.count() == 2)
237+
assert(df3.filter("c1=1").collect.head.getInt(1) == 4)
238+
assert(df3.filter("c1=1").collect.head.getInt(2) == 7)
239+
assert(df3.filter("c1=2").collect.head.getInt(1) == 6)
240+
assert(df3.filter("c1=2").collect.head.getInt(2) == 8)
241+
// turn upsert off, it will do the insert the row with duplicate key, and it will get nullPointerException
242+
val df4 = Seq((1, 5, 9)).toDF("c1", "c2", "c3")
243+
val n = intercept[org.apache.spark.SparkException] {
244+
df4.write.mode(SaveMode.Append).option("upsert", false).option("upsertUpdateColumn", "C11")
245+
.jdbc(jdbcUrl, "upsertT1", new Properties)
246+
}.getMessage
247+
assert(n.contains("Duplicate entry '1' for key 'PRIMARY'"))
248+
}
249+
250+
test("Upsert and Append mode -- data matching two columns") {
251+
// table has these rows: (1, 2, 3), (2, 3, 4), (3, 4, 5)
252+
// update Row(2, 3, 4) to Row(2, 3, 10) that matches 2 columns
253+
val df1 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
254+
assert(df1.count() == 3)
255+
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
256+
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
257+
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
258+
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
259+
260+
val df2 = Seq((2, 3, 10)).toDF("c1", "c2", "c3")
261+
df2.write.mode(SaveMode.Append)
262+
.option("upsert", true).option("upsertUpdateColumn", "c3")
263+
.jdbc(jdbcUrl, "upsertT2", new Properties)
264+
265+
val df3 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
266+
assert(df3.count() == 3)
267+
assert(df3.filter("c1=2").collect.head.getInt(1) == 3)
268+
assert(df3.filter("c1=2").collect.head.getInt(2) == 10)
269+
}
155270
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
1919

2020
import java.sql.Connection
2121
import java.util.Properties
22+
import org.apache.spark.sql.SaveMode
2223

2324
import org.apache.spark.sql.Column
2425
import org.apache.spark.sql.catalyst.expressions.Literal
@@ -27,8 +28,9 @@ import org.apache.spark.tags.DockerTest
2728

2829
@DockerTest
2930
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
31+
import testImplicits._
3032
override val db = new DatabaseOnDocker {
31-
override val imageName = "postgres:9.4.5"
33+
override val imageName = "postgres:9.5.4"
3234
override val env = Map(
3335
"POSTGRES_PASSWORD" -> "rootpass"
3436
)
@@ -55,6 +57,26 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
5557
+ "null, null, null, null, null, "
5658
+ "null, null, null, null, null, null, null)"
5759
).executeUpdate()
60+
conn.prepareStatement("CREATE TABLE upsertT0 " +
61+
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
62+
conn.prepareStatement("INSERT INTO upsertT0 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
63+
.executeUpdate()
64+
conn.prepareStatement("CREATE TABLE upsertT1 " +
65+
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
66+
conn.prepareStatement("INSERT INTO upsertT1 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
67+
.executeUpdate()
68+
conn.prepareStatement("CREATE TABLE upsertT2 " +
69+
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1, c2))").executeUpdate()
70+
conn.prepareStatement("INSERT INTO upsertT2 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
71+
.executeUpdate()
72+
conn.prepareStatement("CREATE TABLE upsertT3 " +
73+
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
74+
conn.prepareStatement("INSERT INTO upsertT3 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
75+
.executeUpdate()
76+
conn.prepareStatement("CREATE TABLE upsertT4 " +
77+
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
78+
conn.prepareStatement("INSERT INTO upsertT4 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
79+
.executeUpdate()
5880
}
5981

6082
test("Type mapping for various types") {
@@ -126,4 +148,93 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
126148
assert(schema(0).dataType == FloatType)
127149
assert(schema(1).dataType == ShortType)
128150
}
151+
152+
test("Upsert and OverWrite mode") {
153+
//existing table has these rows
154+
//(1, 2, 3), (2, 3, 4), (3, 4, 5)
155+
val df1 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
156+
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
157+
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
158+
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
159+
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
160+
val df2 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
161+
// it will do the Overwrite, not upsert
162+
df2.write.mode(SaveMode.Overwrite)
163+
.option("upsert", true).option("upsertConditionColumn", "c1")
164+
.jdbc(jdbcUrl, "upsertT0", new Properties)
165+
val df3 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
166+
assert(df3.filter("c1=1").collect.head.getInt(1) == 3)
167+
assert(df3.filter("c1=1").collect.head.getInt(2) == 6)
168+
assert(df3.filter("c1=2").collect.head.getInt(1) == 5)
169+
assert(df3.filter("c1=2").collect.head.getInt(2) == 6)
170+
assert(df3.filter("c1=3").collect.size == 0)
171+
}
172+
173+
test("upsert with Append and negative option values") {
174+
val df1 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
175+
val m = intercept[java.sql.SQLException] {
176+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11")
177+
.jdbc(jdbcUrl, "upsertT1", new Properties)
178+
}.getMessage
179+
assert(m.contains("column C11 not found"))
180+
181+
val n = intercept[java.sql.SQLException] {
182+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11")
183+
.jdbc(jdbcUrl, "upsertT1", new Properties)
184+
}.getMessage
185+
assert(n.contains("column C11 not found"))
186+
187+
val o = intercept[org.apache.spark.SparkException] {
188+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertconditionColumn", "c2")
189+
.jdbc(jdbcUrl, "upsertT1", new Properties)
190+
}.getMessage
191+
assert(o.contains("there is no unique or exclusion constraint matching the ON CONFLICT"))
192+
}
193+
194+
test("upsert with Append without existing table") {
195+
val df1 = Seq((1, 3), (2, 5)).toDF("c1", "c2")
196+
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1")
197+
.jdbc(jdbcUrl, "upsertT", new Properties)
198+
val df2 = spark.read.jdbc(jdbcUrl, "upsertT", new Properties)
199+
assert(df2.count() == 2)
200+
assert(df2.filter("C1=1").collect.head.get(1) == 3)
201+
202+
// table upsertT create without primary key or unique constraints, it will throw the exception
203+
val df3 = Seq((1, 4)).toDF("c1", "c2")
204+
val p = intercept[org.apache.spark.SparkException] {
205+
df3.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1")
206+
.jdbc(jdbcUrl, "upsertT", new Properties)
207+
}.getMessage
208+
assert(p.contains("there is no unique or exclusion constraint matching the ON CONFLICT specification"))
209+
}
210+
211+
test("Upsert & Append test -- matching one column") {
212+
val df1 = spark.read.jdbc(jdbcUrl, "upsertT3", new Properties())
213+
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
214+
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
215+
assert(df1.filter("c1=4").collect.size == 0)
216+
// update Row(1, 2, 3) to (1, 3, 6) and insert new Row(4, 5, 6)
217+
val df2 = Seq((1, 3, 6), (4, 5, 6)).toDF("c1", "c2", "c3")
218+
// condition on one column
219+
df2.write.mode(SaveMode.Append)
220+
.option("upsert", true).option("upsertConditionColumn", "c1").option("upsertUpdateColumn", "c2, c3")
221+
.jdbc(jdbcUrl, "upsertT3", new Properties)
222+
val df3 = spark.read.jdbc(jdbcUrl, "upsertT3", new Properties())
223+
assert(df3.filter("c1=1").collect.head.getInt(1) == 3)
224+
assert(df3.filter("c1=1").collect.head.getInt(2) == 6)
225+
assert(df3.filter("c1=4").collect.size == 1)
226+
}
227+
228+
test("Upsert & Append test -- matching two columns") {
229+
val df1 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
230+
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
231+
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
232+
// update Row(2, 3, 4) to Row(2, 3, 10) that matches 2 columns
233+
val df2 = Seq((2, 3, 10)).toDF("c1", "c2", "c3")
234+
df2.write.mode(SaveMode.Append)
235+
.option("upsert", true).option("upsertConditionColumn", "c1, c2")
236+
.jdbc(jdbcUrl, "upsertT2", new Properties)
237+
val df3 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
238+
assert(df3.filter("c1=2").collect.head.getInt(2) == 10)
239+
}
129240
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
431431
* You can set the following JDBC-specific option(s) for storing JDBC:
432432
* <ul>
433433
* <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP TABLE`.</li>
434+
* <li>`upsert` (default `false`): under SaveMode.Append mode, specify whether to allow the
435+
* JDBC data source to update a record that is duplicate of the insertion row. Some databases
436+
* may require users to specify on which column(s) to identify such duplicate. </li>
437+
* <li>`upsertConditionColumn`: when `upsert` is `true`, column(s)(separated by comma) need
438+
* to be specified, on which duplicate rows are identified. This requirement is applied to
439+
* database systems, such as DB2, Oracle, PostgreSQL.</li>
440+
* <li>`upsertUpdateColumn`: when `upsert` is `true`, column(s)(separated by comma) could be
441+
* specified, on which update rows are identified. This requirement is applied to database
442+
* systems, such as MySQL.</li>
434443
* </ul>
435444
*
436445
* In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also,
@@ -439,6 +448,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
439448
* while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect,
440449
* the user option `truncate` is ignored.
441450
*
451+
* When use the upsert feature, please be aware these limitations.
452+
* First, if the content of the [[DataFrame]] has mutiple rows with the same key, the upsert
453+
* feature will be non-deterministic, the reason is that the data will be partitioned and send
454+
* to the data source with different JDBC connection. You can avoid this by eliminating the
455+
* duplicate key rows first, then save to the JDBC datasource tables. For example: the key is
456+
* on columns("key", "values1")
457+
*
458+
* {{{
459+
* scala> val testData = sc.parallelize((2, 1, 2) :: (1, 1, 1) :: (1, 2, 3) :: (2, 1, 3) ::
460+
* (2, 2, 2) :: (2, 2, 1) :: (2, 1, 4) :: (1, 1, 4) :: (1, 2, 5) ::
461+
* (1, 2, 6) :: Nil, 5).toDF("key", "value1", "TS")
462+
* scala> val sorted = testData.orderBy('key,'value1)
463+
* scala> val agg = sorted.groupBy('key).agg("TS" -> "max").withColumnRenamed("max(TS)","TS")
464+
* scala> agg.join(sorted, Seq("key","TS"))
465+
* }}}
466+
*
467+
* Second, this upsert feature is only supported for SaveMode.Append. SaveMode.Overwrite will
468+
* first empty the table, then do the insert, also in order to get deterministic result, we
469+
* recommend to remove duplicate key rows in the content of the [[DataFrame]], so there is
470+
* no need to implement upsert in the SaveMode.Append.
471+
*
442472
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
443473
* @param table Name of the table in the external database.
444474
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string

0 commit comments

Comments
 (0)