Skip to content

Commit 69097a8

Browse files
rsotn-maprMikhail Gorbov
authored andcommitted
[MSPARK-107]idField information is lost in MapRDBDataFrameWriterFunctions.saveToMapRDB (apache#186)
(cherry picked from commit 5430a1d)
1 parent df0d939 commit 69097a8

File tree

3 files changed

+20
-9
lines changed

3 files changed

+20
-9
lines changed

external/maprdb/src/main/scala/com/mapr/db/spark/sql/DefaultSource.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,14 @@ class DefaultSource extends DataSourceRegister
4848
parameters: Map[String, String], data: DataFrame): BaseRelation = {
4949

5050
require(parameters.get("tableName").isDefined)
51+
require(parameters.get("idFieldPath").isDefined)
52+
val idFieldPath = parameters("idFieldPath")
5153
//val cParser = new ConditionParser();
5254
val condition : Option[QueryCondition] = parameters.get("QueryCondition").map(cond => ConditionImpl.parseFrom(ByteBuffer.wrap(cond.getBytes)))
5355
lazy val tableExists = DBClient().tableExists(parameters.get("tableName").get)
5456
lazy val tableName = parameters.get("tableName").get
5557
lazy val createTheTable = if (tableExists) false else true
56-
lazy val bulkMode = parameters.get("bulkMode").getOrElse("false").toBoolean
58+
lazy val bulkMode = parameters.get("bulkMode").getOrElse("false").toBoolean
5759
val operation = parameters.get("Operation").getOrElse("ErrorIfExists")
5860
mode match {
5961
case ErrorIfExists => {}
@@ -62,25 +64,25 @@ class DefaultSource extends DataSourceRegister
6264

6365
operation match {
6466
case "Insert" => {
65-
MapRSpark.insert(data, tableName,"_id", createTable = createTheTable, bulkInsert = bulkMode)
67+
MapRSpark.insert(data, tableName,idFieldPath, createTable = createTheTable, bulkInsert = bulkMode)
6668
}
6769

6870
case "InsertOrReplace" => {
69-
MapRSpark.save(data, tableName,"_id",createTable = createTheTable, bulkInsert = bulkMode)
71+
MapRSpark.save(data, tableName,idFieldPath,createTable = createTheTable, bulkInsert = bulkMode)
7072
}
7173

7274
case "ErrorIfExists" => {
7375
if (tableExists) throw new TableExistsException("Table: " + tableName + " already Exists")
74-
else MapRSpark.save(data, tableName,"_id",createTable = true, bulkInsert = bulkMode)
76+
else MapRSpark.save(data, tableName,idFieldPath,createTable = true, bulkInsert = bulkMode)
7577
}
7678

7779
case "Overwrite" => {
7880
DBClient().deleteTable(tableName)
79-
MapRSpark.save(data, tableName,"_id",createTable = true, bulkInsert = bulkMode)
81+
MapRSpark.save(data, tableName,idFieldPath,createTable = true, bulkInsert = bulkMode)
8082
}
8183

8284
case "Update" => {
83-
MapRSpark.update(data, tableName,"_id",createTable = false, bulkInsert = bulkMode)
85+
MapRSpark.update(data, tableName,idFieldPath,createTable = false, bulkInsert = bulkMode)
8486
}
8587

8688
case _ => throw new UnsupportedOperationException("Not supported operation") }

external/maprdb/src/main/scala/com/mapr/db/spark/sql/MapRDBDataFrameWriterFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ import org.apache.spark.sql.DataFrameWriter
77
private[spark] case class MapRDBDataFrameWriterFunctions(@transient dfw: DataFrameWriter[_]) extends LoggingTrait {
88

99
def saveToMapRDB(tableName: String, idFieldPath: String = "_id",
10-
bulkInsert : Boolean = false): Unit = MapRSpark.save(dfw, tableName)
10+
bulkInsert : Boolean = false): Unit = MapRSpark.save(dfw, tableName, idFieldPath, bulkInsert)
1111

1212
}

external/maprdb/src/main/scala/com/mapr/db/spark/utils/MapRSpark.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,17 @@ object MapRSpark {
5353
documentRdd.updateToMapRDB(tableName, mutation, getID, condition)
5454
}
5555

56-
def save(dfw: DataFrameWriter[_], tableName: String): Unit = {
57-
dfw.format(defaultSource).option("tableName", tableName).save()
56+
def save(
57+
dfw: DataFrameWriter[_],
58+
tableName: String,
59+
idFieldPath: String,
60+
bulkInsert: Boolean
61+
): Unit = {
62+
dfw.format(defaultSource)
63+
.option("tableName", tableName)
64+
.option("idFieldPath", idFieldPath)
65+
.option("bulkMode", bulkInsert)
66+
.save()
5867
}
5968

6069
def load(sc: SparkContext, tableName: String) : MapRDBBaseRDD[OJAIDocument] = {

0 commit comments

Comments
 (0)