Skip to content

Commit 2a8a6c1

Browse files
ekrivokonmaprmgorbov
authored andcommitted
MapR [SPARK-128] MapRDB connector - wrong handle of null fields when nullable is false (apache#194)
1 parent 594d5d4 commit 2a8a6c1

10 files changed

Lines changed: 50 additions & 44 deletions

File tree

external/maprdb/src/main/scala/com/mapr/db/spark/RDD/DocumentRDDFunctions.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import com.mapr.db.spark.configuration.SerializableConfiguration
88
import com.mapr.db.spark.dbclient.DBClient
99
import com.mapr.db.spark.utils.{LoggingTrait, MapRDBUtils}
1010
import com.mapr.db.spark.writers._
11-
1211
import org.apache.hadoop.conf.Configuration
1312
import org.ojai.{Document, DocumentConstants, Value}
1413
import org.ojai.store.DocumentMutation

external/maprdb/src/main/scala/com/mapr/db/spark/RDD/MapRDBBaseRDD.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
/* Copyright (c) 2015 & onwards. MapR Tech, Inc., All rights reserved */
22
package com.mapr.db.spark.RDD
33

4+
import scala.language.existentials
5+
import scala.reflect.ClassTag
6+
47
import com.mapr.db.exceptions.DBException
58
import com.mapr.db.spark.condition.{DBQueryCondition, Predicate}
69
import com.mapr.db.spark.dbclient.DBClient
7-
import org.apache.spark.rdd.RDD
10+
import org.ojai.store.QueryCondition
811

9-
import scala.language.existentials
10-
import scala.reflect.ClassTag
12+
import org.apache.spark.rdd.RDD
1113
import org.apache.spark.SparkContext
12-
import org.ojai.store.QueryCondition
1314

1415
private[spark] abstract class MapRDBBaseRDD[T: ClassTag](
1516
@transient val sc: SparkContext,

external/maprdb/src/main/scala/com/mapr/db/spark/RDD/MapRDBTableScanRDD.scala

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package com.mapr.db.spark.RDD
44
import scala.language.existentials
55
import scala.reflect.ClassTag
66
import scala.reflect.runtime.universe._
7-
87
import com.mapr.db.impl.{ConditionImpl, IdCodec}
98
import com.mapr.db.spark.RDD.partition.MaprDBPartition
109
import com.mapr.db.spark.RDD.partitioner.MapRDBPartitioner
@@ -14,14 +13,10 @@ import com.mapr.db.spark.dbclient.DBClient
1413
import com.mapr.db.spark.impl.OJAIDocument
1514
import com.mapr.db.spark.utils.DefaultClass.DefaultType
1615
import com.mapr.db.spark.utils.MapRSpark
17-
1816
import org.ojai.{Document, Value}
19-
2017
import org.apache.spark.{Partition, Partitioner, SparkContext, TaskContext}
2118
import org.apache.spark.broadcast.Broadcast
22-
import org.apache.spark.sql.SparkSession
23-
24-
19+
import org.apache.spark.sql.{DataFrame, SparkSession}
2520

2621
private[spark] class MapRDBTableScanRDD[T: ClassTag](
2722
@transient sparkSession: SparkSession,
@@ -36,9 +31,9 @@ private[spark] class MapRDBTableScanRDD[T: ClassTag](
3631

3732
@transient private lazy val table = DBClient().getTable(tableName)
3833
@transient private lazy val tabletinfos =
39-
if (condition == null || condition.condition.isEmpty)
34+
if (condition == null || condition.condition.isEmpty) {
4035
DBClient().getTabletInfos(tableName)
41-
else DBClient().getTabletInfos(tableName, condition.condition)
36+
} else DBClient().getTabletInfos(tableName, condition.condition)
4237
@transient private lazy val getSplits: Seq[Value] = {
4338
val keys = tabletinfos.map(
4439
tableinfo =>
@@ -52,18 +47,18 @@ private[spark] class MapRDBTableScanRDD[T: ClassTag](
5247
}
5348

5449
private def getPartitioner: Partitioner = {
55-
if (getSplits.isEmpty)
56-
return null
57-
if (getSplits(0).getType == Value.Type.STRING) {
58-
return MapRDBPartitioner(getSplits.map(_.getString))
50+
if (getSplits.isEmpty) {
51+
null
52+
} else if (getSplits(0).getType == Value.Type.STRING) {
53+
MapRDBPartitioner(getSplits.map(_.getString))
5954
} else {
60-
return MapRDBPartitioner(getSplits.map(_.getBinary))
55+
MapRDBPartitioner(getSplits.map(_.getBinary))
6156
}
6257
}
6358

64-
def toDF[T <: Product: TypeTag]() = maprspark[T]()
59+
def toDF[T <: Product: TypeTag](): DataFrame = maprspark[T]()
6560

66-
def maprspark[T <: Product: TypeTag]() = {
61+
def maprspark[T <: Product: TypeTag](): DataFrame = {
6762
MapRSpark.builder
6863
.sparkSession(sparkSession)
6964
.configuration()
@@ -89,7 +84,7 @@ private[spark] class MapRDBTableScanRDD[T: ClassTag](
8984
DBQueryCondition(tabcond)).asInstanceOf[Partition]
9085
})
9186
logDebug("Partitions for the table:" + tableName + " are " + splits)
92-
return splits.toArray
87+
splits.toArray
9388
}
9489

9590
override def getPreferredLocations(split: Partition): Seq[String] = {
@@ -125,8 +120,9 @@ private[spark] class MapRDBTableScanRDD[T: ClassTag](
125120
if (columns != null) {
126121
logDebug("Columns projected from table:" + columns)
127122
itrs = table.find(combinedCond.build(), columns.toArray: _*).iterator()
128-
} else
123+
} else {
129124
itrs = table.find(combinedCond.build()).iterator()
125+
}
130126
val ojaiCursor = reqType.getValue(itrs, beanClass)
131127

132128
context.addTaskCompletionListener((ctx: TaskContext) => {

external/maprdb/src/main/scala/com/mapr/db/spark/RDD/partition/MapRDBPartition.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
/* Copyright (c) 2015 & onwards. MapR Tech, Inc., All rights reserved */
22
package com.mapr.db.spark.RDD.partition
33

4-
import org.apache.spark.Partition
54
import com.mapr.db.spark.condition.DBQueryCondition
5+
6+
import org.apache.spark.Partition
67
/**
78
* An identifier for a partition in a MapRTableScanRDD.
89
*

external/maprdb/src/main/scala/com/mapr/db/spark/sql/DefaultSource.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import com.mapr.db.impl.ConditionImpl
88
import com.mapr.db.spark.dbclient.DBClient
99
import com.mapr.db.spark.utils.MapRSpark
1010
import org.ojai.DocumentConstants
11+
import org.ojai.store.QueryCondition
1112

12-
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
13+
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
1314
import org.apache.spark.sql.sources._
14-
import org.apache.spark.sql.types.StructType
15+
import org.apache.spark.sql.types.{StructField, StructType}
1516
import org.apache.spark.sql.SaveMode._
16-
import org.ojai.store.QueryCondition
1717

1818
class DefaultSource
1919
extends DataSourceRegister
@@ -156,7 +156,6 @@ class DefaultSource
156156
val failureOnConflict = failOnConflict.toBoolean
157157

158158
val rdd = MapRSpark.builder
159-
.sparkContext(sqlContext.sparkContext)
160159
.sparkSession(sqlContext.sparkSession)
161160
.configuration()
162161
.setTable(tableName.get)
@@ -165,15 +164,21 @@ class DefaultSource
165164
.build
166165
.toRDD(null)
167166

168-
val schema: StructType = userSchema match {
167+
val schema: StructType = makeSchemaNullable(userSchema match {
169168
case Some(s) => s
170169
case None =>
171170
GenerateSchema(
172171
rdd,
173172
sampleSize.map(_.toDouble).getOrElse(GenerateSchema.SAMPLE_SIZE),
174173
failureOnConflict)
175-
}
174+
})
176175

177176
MapRDBRelation(tableName.get, schema, rdd, Operation)(sqlContext)
178177
}
178+
179+
private def makeSchemaNullable(schema: StructType): StructType = {
180+
StructType(schema.map(field => {
181+
StructField(field.name, field.dataType, nullable = true, field.metadata )
182+
}))
183+
}
179184
}

external/maprdb/src/main/scala/com/mapr/db/spark/sql/GenerateSchema.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
11
/* Copyright (c) 2015 & onwards. MapR Tech, Inc., All rights reserved */
22
package com.mapr.db.spark.sql
33

4-
import com.mapr.db.spark.RDD.MapRDBBaseRDD
5-
import com.mapr.db.spark.impl.OJAIDocument
6-
import com.mapr.db.spark.utils.MapRSpark
7-
import org.apache.spark.rdd.RDD
8-
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
9-
import org.ojai.DocumentReader
4+
import java.util.Arrays.sort
5+
import java.util.Comparator
106

117
import scala.reflect.runtime.universe._
128
import scala.Array._
13-
import java.util.Arrays.sort
14-
import java.util.Comparator
159

1610
import com.mapr.db.spark.exceptions.SchemaMappingException
11+
import com.mapr.db.spark.RDD.MapRDBBaseRDD
12+
import com.mapr.db.spark.impl.OJAIDocument
13+
import com.mapr.db.spark.utils.MapRSpark
14+
import org.ojai.DocumentReader
15+
16+
import org.apache.spark.rdd.RDD
1717
import org.apache.spark.SparkContext
1818
import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
1919
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2020
import org.apache.spark.sql.types._
21+
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
2122

22-
import scala.annotation.switch
2323

2424
object GenerateSchema {
2525

external/maprdb/src/main/scala/com/mapr/db/spark/streaming/DStreamFunctions.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
/* Copyright (c) 2015 & onwards. MapR Tech, Inc., All rights reserved */
22
package com.mapr.db.spark.streaming
33

4+
import com.mapr.db.spark._
45
import com.mapr.db.spark.dbclient.DBClient
56
import com.mapr.db.spark.utils.LoggingTrait
6-
import org.apache.spark.SparkContext
7-
import org.apache.spark.streaming.dstream.DStream
8-
import org.ojai.DocumentConstants
9-
import com.mapr.db.spark._
107
import com.mapr.db.spark.writers.OJAIValue
8+
import org.ojai.DocumentConstants
9+
10+
import org.apache.spark.streaming.dstream.DStream
11+
import org.apache.spark.SparkContext
1112

1213
class DStreamFunctions[T](dStream: DStream[T])(implicit fv: OJAIValue[T])
1314
extends Serializable

external/maprdb/src/main/scala/com/mapr/db/spark/types/DBBinaryValue.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.mapr.db.spark.types
33

44
import java.io.{Externalizable, ObjectInput, ObjectOutput}
55
import java.nio.{ByteBuffer, ByteOrder}
6+
67
import com.mapr.db.impl.IdCodec
78
import com.mapr.db.spark.utils.MapRDBUtils
89
import com.mapr.db.util.ByteBufs

external/maprdb/src/main/scala/com/mapr/db/spark/types/DBMapValue.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
55
import java.nio._
66
import java.util
77

8-
import scala.language.implicitConversions
98
import scala.collection.JavaConverters._
109
import scala.collection.MapLike
10+
import scala.language.implicitConversions
11+
1112
import com.mapr.db.rowcol.RowcolCodec
1213
import com.mapr.db.spark.dbclient.DBClient
1314
import com.mapr.db.spark.utils.MapRDBUtils

external/maprdb/src/main/scala/com/mapr/db/spark/writers/BulkTableWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
package com.mapr.db.spark.writers
33

44
import java.nio.ByteBuffer
5-
import org.ojai.Document
5+
66
import com.mapr.db.mapreduce.BulkLoadRecordWriter
77
import com.mapr.db.rowcol.DBValueBuilderImpl
8+
import org.ojai.Document
89

910
private[spark] case class BulkTableWriter(@transient table: BulkLoadRecordWriter) extends Writer {
1011

0 commit comments

Comments
 (0)