Skip to content

Commit 594d5d4

Browse files
author
Mikhail Gorbov
committed
* MapR [SPARK-121] Spark OJAI JAVA: Read to Dataset functionality implementation
* Minor refactoring
1 parent f02a1dc commit 594d5d4

9 files changed

Lines changed: 173 additions & 98 deletions

File tree

external/maprdb/src/main/scala/com/mapr/db/spark/RDD/DocumentRDDFunctions.scala

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private[spark] class DocumentRDDFunctions extends LoggingTrait {
2424
createTable: Boolean = false,
2525
bulkInsert: Boolean = false,
2626
function1: (Broadcast[SerializableConfiguration],
27-
Boolean) => Function1[Iterator[T], Unit]): Unit = {
27+
Boolean) => ((Iterator[T]) => Unit)): Unit = {
2828
var isNewAndBulkLoad = (false, false)
2929

3030
val partitioner: Option[Partitioner] = rdd.partitioner
@@ -67,31 +67,30 @@ private[spark] case class OJAIDocumentRDDFunctions[T](rdd: RDD[T])(
6767

6868
@transient val sparkContext = rdd.sparkContext
6969

70-
def saveToMapRDB(tablename: String,
70+
def saveToMapRDB(tableName: String,
7171
createTable: Boolean = false,
7272
bulkInsert: Boolean = false,
7373
idFieldPath: String = DocumentConstants.ID_KEY): Unit = {
7474
logDebug(
75-
s"saveToMapRDB in OJAIDocumentRDDFunctions is called for table: $tablename " +
75+
s"saveToMapRDB in OJAIDocumentRDDFunctions is called for table: $tableName " +
7676
s"with bulkinsert flag set: $bulkInsert and createTable: $createTable")
7777

78-
var getID: (Document) => Value = null
79-
if (idFieldPath == DocumentConstants.ID_KEY) {
80-
getID = (doc: Document) => doc.getId
78+
val getID: Document => Value = if (idFieldPath == DocumentConstants.ID_KEY) {
79+
(doc: Document) => doc.getId
8180
} else {
82-
getID = (doc: Document) => doc.getValue(idFieldPath)
81+
(doc: Document) => doc.getValue(idFieldPath)
8382
}
8483

8584
this.saveToMapRDBInternal(
8685
rdd,
87-
tablename,
86+
tableName,
8887
createTable,
8988
bulkInsert,
90-
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
89+
(cnf: Broadcast[SerializableConfiguration], isNewAndBulkLoad: Boolean) =>
9190
(iter: Iterator[T]) => {
9291
if (iter.nonEmpty) {
9392
val writer =
94-
Writer.initialize(tablename, cnf.value, isnewAndBulkLoad, true)
93+
Writer.initialize(tableName, cnf.value, isNewAndBulkLoad, true)
9594
while (iter.hasNext) {
9695
val element = iter.next
9796
f.write(f.getValue(element), getID, writer)
@@ -110,8 +109,8 @@ private[spark] case class OJAIDocumentRDDFunctions[T](rdd: RDD[T])(
110109
s"insertToMapRDB in OJAIDocumentRDDFunctions is called for table: $tablename" +
111110
s" with bulkinsert flag set: $bulkInsert and createTable: $createTable")
112111

113-
var getID: (Document) => Value = if (idFieldPath == DocumentConstants.ID_KEY) {
114-
(doc: Document) => doc.getId
112+
val getID: (Document) => Value = if (idFieldPath == DocumentConstants.ID_KEY) {
113+
(doc: Document) => doc.getId
115114
} else {
116115
(doc: Document) => doc.getValue(idFieldPath)
117116
}
@@ -121,11 +120,11 @@ private[spark] case class OJAIDocumentRDDFunctions[T](rdd: RDD[T])(
121120
tablename,
122121
createTable,
123122
bulkInsert,
124-
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
123+
(cnf: Broadcast[SerializableConfiguration], isNewAndBulkLoad: Boolean) =>
125124
(iter: Iterator[T]) => {
126125
if (iter.nonEmpty) {
127126
val writer =
128-
Writer.initialize(tablename, cnf.value, isnewAndBulkLoad, false)
127+
Writer.initialize(tablename, cnf.value, isNewAndBulkLoad, false)
129128
while (iter.hasNext) {
130129
val element = iter.next
131130
f.write(f.getValue(element), getID, writer)
@@ -136,20 +135,20 @@ private[spark] case class OJAIDocumentRDDFunctions[T](rdd: RDD[T])(
136135
)
137136
}
138137

139-
def updateToMapRDB(tablename: String,
138+
def updateToMapRDB(tableName: String,
140139
mutation: (T) => DocumentMutation,
141140
getID: (T) => Value): Unit = {
142141
logDebug(
143-
"updateToMapRDB in OJAIDocumentRDDFunctions is called for table: " + tablename)
142+
"updateToMapRDB in OJAIDocumentRDDFunctions is called for table: " + tableName)
144143
this.saveToMapRDBInternal(
145144
rdd,
146-
tablename,
145+
tableName,
147146
false,
148147
false,
149148
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
150149
(iter: Iterator[T]) =>
151150
if (iter.nonEmpty) {
152-
val writer = TableUpdateWriter(DBClient().getTable(tablename))
151+
val writer = TableUpdateWriter(DBClient().getTable(tableName))
153152
while (iter.hasNext) {
154153
val element = iter.next
155154
f.update(mutation(element), getID(element), writer)
@@ -159,24 +158,24 @@ private[spark] case class OJAIDocumentRDDFunctions[T](rdd: RDD[T])(
159158
)
160159
}
161160

162-
def updateToMapRDB(tablename: String,
161+
def updateToMapRDB(tableName: String,
163162
mutation: (T) => DocumentMutation,
164163
getID: (T) => Value,
165164
condition: Predicate): Unit = {
166165
logDebug(
167-
"updateToMapRDB in OJAIDocumentRDDFunctions is called for table: " + tablename)
166+
"updateToMapRDB in OJAIDocumentRDDFunctions is called for table: " + tableName)
168167
val queryCondition = DBQueryCondition(condition.build.build())
169168

170169
this.saveToMapRDBInternal(
171170
rdd,
172-
tablename,
171+
tableName,
173172
false,
174173
false,
175174
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
176175
(iter: Iterator[T]) =>
177176
if (iter.nonEmpty) {
178177
val writer =
179-
TableCheckAndMutateWriter(DBClient().getTable(tablename))
178+
TableCheckAndMutateWriter(DBClient().getTable(tableName))
180179
while (iter.hasNext) {
181180
val element = iter.next
182181
f.checkAndUpdate(mutation(element),
@@ -196,23 +195,23 @@ private[spark] case class PairedDocumentRDDFunctions[K, V](rdd: RDD[(K, V)])(
196195
extends DocumentRDDFunctions {
197196

198197
@transient val sparkContext = rdd.sparkContext
199-
def saveToMapRDB(tablename: String,
198+
def saveToMapRDB(tableName: String,
200199
createTable: Boolean = false,
201200
bulkInsert: Boolean = false): Unit = {
202201
logDebug(
203202
"saveToMapRDB in PairedDocumentRDDFunctions is called for table: " +
204-
tablename + " with bulkinsert flag set: " + bulkInsert + " and createTable:" + createTable)
203+
tableName + " with bulkinsert flag set: " + bulkInsert + " and createTable:" + createTable)
205204

206205
this.saveToMapRDBInternal[(K, V)](
207206
rdd,
208-
tablename,
207+
tableName,
209208
createTable,
210209
bulkInsert,
211210
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
212211
(iter: Iterator[(K, V)]) =>
213212
if (iter.nonEmpty) {
214213
val writer =
215-
Writer.initialize(tablename, cnf.value, isnewAndBulkLoad, true)
214+
Writer.initialize(tableName, cnf.value, isnewAndBulkLoad, true)
216215
while (iter.hasNext) {
217216
val element = iter.next
218217
f.write(v.getValue(element._2), f.getValue(element._1), writer)
Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
/* Copyright (c) 2015 & onwards. MapR Tech, Inc., All rights reserved */
22
package com.mapr.db.spark.RDD
33

4-
import com.mapr.db.spark.field
4+
import scala.language.implicitConversions
5+
import scala.reflect._
6+
57
import com.mapr.db.spark.condition._
8+
import com.mapr.db.spark.dbclient.DBClient
9+
import com.mapr.db.spark.field
610
import com.mapr.db.spark.impl.OJAIDocument
711
import com.mapr.db.spark.utils.DefaultClass.DefaultType
812
import com.mapr.db.spark.writers.OJAIKey
13+
import org.ojai.DocumentConstants
14+
915
import org.apache.spark.rdd.RDD
10-
import scala.language.implicitConversions
11-
import com.mapr.db.spark.dbclient.DBClient
12-
import scala.reflect._
1316

1417
case class FilterRDDFunctions[K: OJAIKey: quotes](rdd: RDD[K]) {
1518

@@ -20,7 +23,7 @@ case class FilterRDDFunctions[K: OJAIKey: quotes](rdd: RDD[K]) {
2023
val table = DBClient().getTable(tableName)
2124

2225
partition.flatMap(item => {
23-
val condition = field("_id") === item
26+
val condition = field(DocumentConstants.ID_KEY) === item
2427
reqType.getValue(table.find(condition.build).iterator(),
2528
classTag[D].runtimeClass.asInstanceOf[Class[D]])
2629
})
@@ -38,7 +41,7 @@ case class FilterRDDFunctions[K: OJAIKey: quotes](rdd: RDD[K]) {
3841
while (partition.hasNext) {
3942
gets = gets :+ partition.next
4043
if (gets.size == 4) {
41-
val condition = field("_id") in gets
44+
val condition = field(DocumentConstants.ID_KEY) in gets
4245
res = res ++ reqType.getValue(
4346
table.find(condition.build).iterator(),
4447
classTag[D].runtimeClass.asInstanceOf[Class[D]])
@@ -47,48 +50,13 @@ case class FilterRDDFunctions[K: OJAIKey: quotes](rdd: RDD[K]) {
4750
}
4851

4952
if (gets.nonEmpty) {
50-
val condition = field("_id") in gets
53+
val condition = field(DocumentConstants.ID_KEY) in gets
5154
res = res ++ reqType.getValue(
5255
table.find(condition.build).iterator(),
5356
classTag[D].runtimeClass.asInstanceOf[Class[D]])
5457
gets = Seq[K]()
5558
}
5659
res.iterator
5760
})
58-
59-
// def bulkJoinWithMapRDB[D : ClassTag](tableName: String)
60-
// (implicit e: D DefaultType OJAIDocument, reqType: RDDTYPE[D]): RDD[D] = {
61-
// rdd.mapPartitions( partition => {
62-
// val table = MapRDBImpl.getTable(tableName)
63-
// val preparedPartitions : Seq[Seq[K]] = partition.foldLeft[Seq[Seq[K]]](Seq[Seq[K]]()) {
64-
// case (Nil, item) => Seq(Seq(item))
65-
// case (result, s) =>
66-
// if (result.last.size < 4) result.dropRight(1) :+ (result.last :+ s)
67-
// else result :+ Seq(s)
68-
// }
69-
//
70-
// val output : Iterator[D]= preparedPartitions.map(items => {
71-
// val condition = field("_id") in items
72-
// reqType.getValue(table.find(condition.build).iterator(),
73-
// classTag[D].runtimeClass.asInstanceOf[Class[D]])})
74-
// output })
75-
// output })
76-
// return preparedPartitions.asInstanceOf[RDD[D]]
77-
// .flatMap(items : Seq[K] => {
78-
// val condition = field("_id") in items
79-
// reqType.getValue(table.find(condition.build).iterator(),
80-
// classTag[D].runtimeClass.asInstanceOf[Class[D]])
81-
// }
82-
// })
83-
// partition.foldLeft[List[K](List[Iterator[_]]()) {
84-
// case (Nil, s) => List(List(s))
85-
// case (result, s) => if (result.last.size < 4) result.dropRight(1) :+ (result.last :+ s)
86-
// else result :+ List(s)
87-
// }.flatMap(items => {
88-
// val condition = field("_id") in items
89-
// reqType.getValue(table.find(condition.build).iterator(),
90-
// classTag[D].runtimeClass.asInstanceOf[Class[D]])
91-
// })
92-
// })
9361
}
9462
}

external/maprdb/src/main/scala/com/mapr/db/spark/sql/DefaultSource.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import com.mapr.db.exceptions.TableExistsException
77
import com.mapr.db.impl.ConditionImpl
88
import com.mapr.db.spark.dbclient.DBClient
99
import com.mapr.db.spark.utils.MapRSpark
10+
import org.ojai.DocumentConstants
11+
1012
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
1113
import org.apache.spark.sql.sources._
1214
import org.apache.spark.sql.types.StructType
@@ -64,7 +66,7 @@ class DefaultSource
6466

6567

6668
require(parameters.get("tableName").isDefined, "Table name must be defined")
67-
val idFieldPath = parameters.getOrElse("idFieldPath", "_id")
69+
val idFieldPath = parameters.getOrElse("idFieldPath", DocumentConstants.ID_KEY)
6870
val condition: Option[QueryCondition] = parameters
6971
.get("QueryCondition")
7072
.map(cond => ConditionImpl.parseFrom(ByteBuffer.wrap(cond.getBytes)))

external/maprdb/src/main/scala/com/mapr/db/spark/sql/MapRDBDataFrameFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.mapr.db.spark.sql
33

44
import com.mapr.db.spark.condition.Predicate
55
import com.mapr.db.spark.utils.{LoggingTrait, MapRSpark}
6-
6+
import org.ojai.DocumentConstants
77
import org.ojai.store.DocumentMutation
88

99
import org.apache.spark.sql.{DataFrame, Row}
@@ -12,13 +12,13 @@ private[spark] case class MapRDBDataFrameFunctions(@transient df: DataFrame)
1212
extends LoggingTrait {
1313

1414
def saveToMapRDB(tableName: String,
15-
idFieldPath: String = "_id",
15+
idFieldPath: String = DocumentConstants.ID_KEY,
1616
createTable: Boolean = false,
1717
bulkInsert: Boolean = false): Unit =
1818
MapRSpark.save(df, tableName, idFieldPath, createTable, bulkInsert)
1919

2020
def insertToMapRDB(tableName: String,
21-
idFieldPath: String = "_id",
21+
idFieldPath: String = DocumentConstants.ID_KEY,
2222
createTable: Boolean = false,
2323
bulkInsert: Boolean = false): Unit =
2424
MapRSpark.insert(df, tableName, idFieldPath, createTable, bulkInsert)

external/maprdb/src/main/scala/com/mapr/db/spark/sql/MapRDBDataFrameWriterFunctions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package com.mapr.db.spark.sql
33

44
import com.mapr.db.spark.utils.{LoggingTrait, MapRSpark}
5+
import org.ojai.DocumentConstants
56

67
import org.apache.spark.sql.DataFrameWriter
78

@@ -10,7 +11,7 @@ private[spark] case class MapRDBDataFrameWriterFunctions(
1011
extends LoggingTrait {
1112

1213
def saveToMapRDB(tableName: String,
13-
idFieldPath: String = "_id",
14+
idFieldPath: String = DocumentConstants.ID_KEY,
1415
bulkInsert: Boolean = false): Unit =
1516
MapRSpark.save(dfw, tableName, idFieldPath, bulkInsert)
1617

external/maprdb/src/main/scala/com/mapr/db/spark/sql/SparkSessionFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/* Copyright (c) 2015 & onwards. MapR Tech, Inc., All rights reserved */
22
package com.mapr.db.spark.sql
33

4+
import scala.reflect.runtime.universe._
5+
46
import com.mapr.db.spark.utils.MapRSpark
7+
58
import org.apache.spark.sql.{DataFrame, SparkSession}
69
import org.apache.spark.sql.types.StructType
710

8-
import scala.reflect.runtime.universe._
9-
1011
case class SparkSessionFunctions(@transient sparkSession: SparkSession)
1112
extends Serializable {
1213

@@ -19,7 +20,6 @@ case class SparkSessionFunctions(@transient sparkSession: SparkSession)
1920
.builder()
2021
.sparkSession(sparkSession)
2122
.configuration()
22-
.sparkContext(sparkSession.sparkContext)
2323
.setTable(tableName)
2424
.build()
2525
.toDF[T](schema, sampleSize)

0 commit comments

Comments
 (0)