Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3b44c59
adding testcase
kevinyu98 Apr 20, 2016
18b4a31
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 22, 2016
4f4d1c8
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 23, 2016
f5f0cbe
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 23, 2016
d8b2edb
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 25, 2016
196b6c6
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 25, 2016
f37a01e
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 27, 2016
bb5b01f
Merge remote-tracking branch 'upstream/master'
kevinyu98 Apr 30, 2016
bde5820
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 4, 2016
5f7cd96
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 10, 2016
ae0be70
fix comments
kevinyu98 May 13, 2016
741daff
fixing style
kevinyu98 May 13, 2016
893a49a
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 13, 2016
bbed47a
address comments
kevinyu98 May 17, 2016
536d20c
clean the codes
kevinyu98 May 17, 2016
54cfc24
remove unused import
kevinyu98 May 17, 2016
4bbe1fd
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 17, 2016
b2dd795
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 18, 2016
de7757d
rebase
kevinyu98 May 18, 2016
db4bb48
adding JavaBigInteger in converters
kevinyu98 May 18, 2016
8c3e5da
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 18, 2016
a0eaa40
Merge remote-tracking branch 'upstream/master'
kevinyu98 May 19, 2016
b1527b7
Merge branch 'spark-11827newnewnew' into spark-11827new1
kevinyu98 May 19, 2016
b26412e
address comments
kevinyu98 May 19, 2016
43faed3
delete BIGINT_DEFAULT
kevinyu98 May 19, 2016
3b4e360
address comments
kevinyu98 May 19, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark.sql.catalyst

import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.math.{BigInteger => JavaBigInteger}
import java.sql.{Date, Timestamp}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

import scala.language.existentials
import scala.math.{BigInt => ScalaBigInt}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -321,11 +323,13 @@ object CatalystTypeConverters {
}

private class DecimalConverter(dataType: DecimalType)
extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
Copy link
Contributor

@cloud-fan cloud-fan May 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this file? I think we should use encoders most of the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I will take this out. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Wenchen: I have to keep case d: JavaBigInteger => Decimal(d) there, otherwise, this testcase will fail with the java.math.BigInteger.

@test
public void testCreateDataFrameFromLocalJavaBeans() {
Bean bean = new Bean();
List data = Arrays.asList(bean);
Dataset df = spark.createDataFrame(data, Bean.class);
validateDataFrameWithBeans(bean, df);
}
here is the trace

scala.MatchError: 1234567 (of class java.math.BigInteger)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$DecimalConverter.toCatalystImpl(CatalystTypeConverters.scala:326)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$DecimalConverter.toCatalystImpl(CatalystTypeConverters.scala:323)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:892)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:892)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:892)
at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$1.apply(SQLContext.scala:890)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.toStream(Iterator.scala:1322)
at scala.collection.AbstractIterator.toStream(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toSeq(TraversableOnce.scala:298)
at scala.collection.AbstractIterator.toSeq(Iterator.scala:1336)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:373)
at test.org.apache.spark.sql.JavaDataFrameSuite.testCreateDataFrameFromLocalJavaBeans(JavaDataFrameSuite.java:200)

override def toCatalystImpl(scalaValue: Any): Decimal = {
val decimal = scalaValue match {
case d: BigDecimal => Decimal(d)
case d: JavaBigDecimal => Decimal(d)
case d: JavaBigInteger => Decimal(d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you hold on until #13008? Then we can revert this change as CatalystTypeConverter is not used when creating DataFrame.

case d: ScalaBigInt => Decimal(d)
case d: Decimal => d
}
if (decimal.changePrecision(dataType.precision, dataType.scale)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ object JavaTypeInference {
case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)

case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType.SYSTEM_DEFAULT, true)
case c: Class[_] if c == classOf[java.math.BigInteger] => (DecimalType.BIGINT_DEFAULT, true)
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ object ScalaReflection extends ScalaReflection {
case t if t <:< localTypeOf[BigDecimal] =>
Invoke(getPath, "toBigDecimal", ObjectType(classOf[BigDecimal]))

case t if t <:< localTypeOf[java.math.BigInteger] =>
Invoke(getPath, "toJavaBigInteger", ObjectType(classOf[java.math.BigInteger]))

case t if t <:< localTypeOf[scala.math.BigInt] =>
Invoke(getPath, "toScalaBigInt", ObjectType(classOf[scala.math.BigInt]))

case t if t <:< localTypeOf[Array[_]] =>
val TypeRef(_, _, Seq(elementType)) = t

Expand Down Expand Up @@ -590,6 +596,18 @@ object ScalaReflection extends ScalaReflection {
DecimalType.SYSTEM_DEFAULT,
"apply",
inputObject :: Nil)
case t if t <:< localTypeOf[java.math.BigInteger] =>
StaticInvoke(
Decimal.getClass,
DecimalType.BIGINT_DEFAULT,
"apply",
inputObject :: Nil)
case t if t <:< localTypeOf[scala.math.BigInt] =>
StaticInvoke(
Decimal.getClass,
DecimalType.BIGINT_DEFAULT,
"apply",
inputObject :: Nil)

case t if t <:< localTypeOf[java.lang.Integer] =>
Invoke(inputObject, "intValue", IntegerType)
Expand Down Expand Up @@ -735,6 +753,10 @@ object ScalaReflection extends ScalaReflection {
case t if t <:< localTypeOf[BigDecimal] => Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if t <:< localTypeOf[java.math.BigDecimal] =>
Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if t <:< localTypeOf[java.math.BigInteger] =>
Schema(DecimalType.BIGINT_DEFAULT, nullable = true)
case t if t <:< localTypeOf[scala.math.BigInt] =>
Schema(DecimalType.BIGINT_DEFAULT, nullable = true)
case t if t <:< localTypeOf[Decimal] => Schema(DecimalType.SYSTEM_DEFAULT, nullable = true)
case t if t <:< localTypeOf[java.lang.Integer] => Schema(IntegerType, nullable = true)
case t if t <:< localTypeOf[java.lang.Long] => Schema(LongType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.types

import java.math.{MathContext, RoundingMode}
import java.math.{BigInteger, MathContext, RoundingMode}

import org.apache.spark.annotation.DeveloperApi

Expand Down Expand Up @@ -128,6 +128,23 @@ final class Decimal extends Ordered[Decimal] with Serializable {
this
}

/**
* Set this Decimal to the given BigInteger value. Will have precision 38 and scale 0.
*/
def set(BigIntVal: BigInteger): Decimal = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case the variable name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it.

try {
this.decimalVal = null
this.longVal = BigIntVal.longValueExact()
this._precision = DecimalType.MAX_PRECISION
this._scale = 0
this
}
catch {
case e: ArithmeticException =>
throw new IllegalArgumentException(s"BigInteger ${BigIntVal} too large for decimal")
}
}

/**
* Set this Decimal to the given Decimal value.
*/
Expand Down Expand Up @@ -371,6 +388,10 @@ object Decimal {

def apply(value: java.math.BigDecimal): Decimal = new Decimal().set(value)

def apply(value: java.math.BigInteger): Decimal = new Decimal().set(value)

def apply(value: scala.math.BigInt): Decimal = new Decimal().set(value.bigInteger)

def apply(value: BigDecimal, precision: Int, scale: Int): Decimal =
new Decimal().set(value, precision, scale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.types

import java.math.BigInteger

import scala.reflect.runtime.universe.typeTag

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -109,6 +111,7 @@ object DecimalType extends AbstractDataType {
val MAX_SCALE = 38
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18)
val USER_DEFAULT: DecimalType = DecimalType(10, 0)
val BIGINT_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 0)
Copy link
Contributor

@cloud-fan cloud-fan May 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a private[sql] val BigIntDecimal = DecimalType(38, 0) to the next section, instead of doing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I will do that.


// The decimal types compatible with other numeric types
private[sql] val ByteDecimal = DecimalType(3, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.types

import java.math.BigInteger

import scala.language.postfixOps

import org.scalatest.PrivateMethodTester
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.util.*;
import java.math.BigInteger;
import java.math.BigDecimal;

import scala.collection.JavaConverters;
import scala.collection.Seq;
Expand Down Expand Up @@ -130,6 +132,7 @@ public static class Bean implements Serializable {
private Integer[] b = { 0, 1 };
private Map<String, int[]> c = ImmutableMap.of("hello", new int[] { 1, 2 });
private List<String> d = Arrays.asList("floppy", "disk");
private BigInteger e = new BigInteger("1234567");

public double getA() {
return a;
Expand All @@ -146,6 +149,8 @@ public Map<String, int[]> getC() {
public List<String> getD() {
return d;
}

public BigInteger getE() { return e; }
}

void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
Expand All @@ -163,7 +168,9 @@ void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
Assert.assertEquals(
new StructField("d", new ArrayType(DataTypes.StringType, true), true, Metadata.empty()),
schema.apply("d"));
Row first = df.select("a", "b", "c", "d").first();
Assert.assertEquals(new StructField("e", DataTypes.createDecimalType(38,0), true, Metadata.empty()),
schema.apply("e"));
Row first = df.select("a", "b", "c", "d","e").first();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a space before "e"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add

Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0);
// Now Java lists and maps are converted to Scala Seq's and Map's. Once we get a Seq below,
// verify that it has the expected length, and contains expected elements.
Expand All @@ -182,6 +189,8 @@ void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
for (int i = 0; i < d.length(); i++) {
Assert.assertEquals(bean.getD().get(i), d.apply(i));
}
// Java.math.BigInteger is equavient to Spark Decimal(38,0)
Assert.assertEquals(new BigDecimal(bean.getE()), first.getDecimal(4).setScale(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to setScale here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove that.

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ case class ReflectData(
decimalField: java.math.BigDecimal,
date: Date,
timestampField: Timestamp,
seqInt: Seq[Int])
seqInt: Seq[Int],
javaBigInt: java.math.BigInteger,
scalaBigInt: scala.math.BigInt)

case class ReflectData3(
scalaBigInt: scala.math.BigInt
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this to a single line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed that code.


case class NullReflectData(
intField: java.lang.Integer,
Expand Down Expand Up @@ -77,13 +83,15 @@ class ScalaReflectionRelationSuite extends SparkFunSuite with SharedSQLContext {

test("query case class RDD") {
val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
new java.math.BigDecimal(1), Date.valueOf("1970-01-01"), new Timestamp(12345), Seq(1, 2, 3))
new java.math.BigDecimal(1), Date.valueOf("1970-01-01"), new Timestamp(12345), Seq(1, 2, 3),
new java.math.BigInteger("1"), scala.math.BigInt(1))
Seq(data).toDF().registerTempTable("reflectData")

assert(sql("SELECT * FROM reflectData").collect().head ===
Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
new java.math.BigDecimal(1), Date.valueOf("1970-01-01"),
new Timestamp(12345), Seq(1, 2, 3)))
new Timestamp(12345), Seq(1, 2, 3), new java.math.BigDecimal(1),
new java.math.BigDecimal(1)))
}

test("query case class RDD with nulls") {
Expand Down