Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5f33482
update pom.xml for hadoop-2.3-cdh50.0 and hbase 0.96.1.1
javadba Jul 16, 2014
8ddbcce
Mesos workaround
javadba Jul 23, 2014
7ea3391
SPARK-2638 MapOutputTracker concurrency improvement
javadba Jul 23, 2014
f780ad1
Updated concurrency fix for using same monitor on the synchronized an…
javadba Jul 25, 2014
46bccf5
Manually revert custom changes to master
javadba Aug 1, 2014
a91f6a3
update pom.xml for hadoop-2.3-cdh50.0 and hbase 0.96.1.1
javadba Jul 16, 2014
c638587
Mesos workaround
javadba Jul 23, 2014
31dcd4f
SPARK-2638 MapOutputTracker concurrency improvement
javadba Jul 23, 2014
afe17e2
Updated concurrency fix for using same monitor on the synchronized an…
javadba Jul 25, 2014
0d9db98
Manually revert custom changes to master
javadba Aug 1, 2014
b08c87f
Revert "update pom.xml for hadoop-2.3-cdh50.0 and hbase 0.96.1.1"
javadba Aug 1, 2014
1e10e00
Revert "SPARK-2638 MapOutputTracker concurrency improvement"
javadba Aug 1, 2014
dea01f5
Manually revert custom changes to master
javadba Aug 1, 2014
aae4b68
Merge branch 'master' of https://github.com/javadba/spark
javadba Aug 1, 2014
2fc131e
Do this again: Manually revert custom changes to master
javadba Aug 1, 2014
42f5016
SPARK-2686 Add Length support to Spark SQL and HQL and Strlen support…
javadba Jul 25, 2014
ad3859e
Ongoing work with Ueshin and Marmbrus
javadba Aug 1, 2014
6a6222a
Ongoing work with Takuya and Michael A
javadba Aug 1, 2014
81c64c3
Revert whitespace/formatting changes on other sections of ExpressionE…
javadba Aug 1, 2014
94fcbd3
Change default encoding to UTF-8
javadba Aug 2, 2014
a0a03d7
Replace len() method with simpler call to codePointCount
javadba Aug 2, 2014
91761be
Remove spurious output log file
javadba Aug 2, 2014
22eddbc
Use Octet/Char_Len instead of Octet/Char_length due to apparent preex…
Aug 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val EXCEPT = Keyword("EXCEPT")
protected val SUBSTR = Keyword("SUBSTR")
protected val SUBSTRING = Keyword("SUBSTRING")
protected val LEN = Keyword("LEN")
protected val LENGTH = Keyword("LENGTH")
protected val CHAR_LEN = Keyword("CHAR_LEN")
protected val OCTET_LEN = Keyword("OCTET_LEN")

// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
Expand Down Expand Up @@ -323,6 +327,13 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
(SUBSTR | SUBSTRING) ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ {
case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l)
} |
(LEN | LENGTH | CHAR_LEN) ~> "(" ~> expression <~ ")" ^^ { case s => Length(s) } |
OCTET_LEN ~> "(" ~> expression ~ "," ~ expression <~ ")" ^^ {
case s ~ "," ~ e => OctetLength(s, e)
} |
OCTET_LEN ~> "(" ~> expression <~ ")" ^^ {
case s => OctetLength(s, Literal(OctetLengthConstants.DefaultEncoding))
} |
ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class Expression extends TreeNode[Expression] {
* - A [[BinaryExpression]] is foldable if its both left and right child are foldable
* - A [[Not]], [[IsNull]], or [[IsNotNull]] is foldable if its child is foldable
* - A [[Literal]] is foldable
* - A [[Cast]] or [[UnaryMinus]] is foldable if its child is foldable
* - A [[Cast]] or [[UnaryMinus]] or [[Length/Octetlen]] is foldable if its child is foldable
*/
def foldable: Boolean = false
def nullable: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.sql.catalyst.expressions

import java.io.UnsupportedEncodingException
import java.util.regex.Pattern

import org.apache.spark.Logging

import scala.collection.IndexedSeqOptimized


import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.types.{BinaryType, BooleanType, DataType, StringType}
import org.apache.spark.sql.catalyst.types.{BinaryType, BooleanType, DataType, StringType, IntegerType}

trait StringRegexExpression {
self: BinaryExpression =>
Expand Down Expand Up @@ -208,6 +211,83 @@ case class EndsWith(left: Expression, right: Expression)
def compare(l: String, r: String) = l.endsWith(r)
}


/**
* A function that returns the number of bytes in an expression
*/
case class Length(child: Expression) extends UnaryExpression {

type EvaluatedType = Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be IntegerType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make integertype well then the input has to be integer. I have made the semantics here that ANY type may be provided.


override def dataType = IntegerType

override def foldable = child.foldable

override def nullable = child.nullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, now nullable becomes true because eval returns null if the UnsupportedEncodingException is thrown regardless of child.nullable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ueshin, based on your prior comment I changed the logic it now returns null instead of throwing the exception.

    case ue : UnsupportedEncodingException => {
      log.debug(s"strlen: Caught UnsupportedEncodingException for encoding=[$strEncoding]")
      null

So I do not understand this comment - would you please clarify? thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is not related to my prior comment.
I just mentioned that you should remove unnecessary isInstanceOf on my prior comment.

After that, you changed the logic to return null if the exception is thrown, so the nullability of this operator was also changed to true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I think I got what you are saying. Is the following code looking better to you?

a) Change to not throw exception in the case of non-string input:

} else if (!string.isInstanceOf[String]) {
  log.debug(s"Non-string value [$string] provided to strlen")
  null
}

b) Remove unnecessary isInstanceOf :

  try {
    string.getBytes(strEncoding).length
  } catch {
    case ue : UnsupportedEncodingException => {
      log.debug(s"strlen: Caught UnsupportedEncodingException for encoding=[$strEncoding]")
      null
    }

NOTE: that is incorrect, can not compile So we need the following

    string.asInstanceOf[String].getBytes(strEncoding).length

Maybe there is another place you are referring to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'm just saying:

override def nullable = true

instead of

override def nullable = child.nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, made that change.


override def toString = s"Length($child)"

override def eval(input: Row): EvaluatedType = {
val inputVal = child.eval(input)
if (inputVal == null) {
null
} else if (!inputVal.isInstanceOf[String]) {
inputVal.toString.length
} else {
val str = inputVal.asInstanceOf[String]
str.codePointCount(0, str.length)
}
}
}

object OctetLengthConstants {
val DefaultEncoding = "UTF-8"
}

/**
* A function that returns the number of characters in a string expression
*/
case class OctetLength(child: Expression, encoding : Expression) extends UnaryExpression
with Logging {

type EvaluatedType = Any

override def dataType = IntegerType

override def foldable = child.foldable

override def nullable = true

override def toString = s"OctetLen($child, $encoding)"

override def eval(input: Row): EvaluatedType = {
val evalInput = child.eval(input)
if (evalInput == null) {
null
} else if (!evalInput.isInstanceOf[String]) {
log.debug(s"Non-string value [$evalInput] provided to OctetLen")
null
} else {
var evalEncoding = encoding.eval(input)
val strEncoding =
if (evalEncoding != null) {
evalEncoding.toString
} else {
OctetLengthConstants.DefaultEncoding
}
val s: String = ""
try {
evalInput.asInstanceOf[String].getBytes(strEncoding).length
} catch {
case ue : UnsupportedEncodingException => {
throw new UnsupportedEncodingException(
s"OctetLen: Caught UnsupportedEncodingException for encoding=[$strEncoding]")
}
}
}
}
}

/**
* A function that takes a substring of its first argument starting at a given position.
* Defined for String and Binary types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ object NullPropagation extends Rule[LogicalPlan] {
case e @ Substring(_, Literal(null, _), _) => Literal(null, e.dataType)
case e @ Substring(_, _, Literal(null, _)) => Literal(null, e.dataType)

case e @ Length(Literal(null, _)) => Literal(null, e.dataType)
case e @ OctetLength(Literal(null, _),_) => Literal(null, e.dataType)

// Put exceptional cases above if any
case e: BinaryArithmetic => e.children match {
case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,42 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation(s.substring(0, 2), "ex", row)
checkEvaluation(s.substring(0), "example", row)
}

test("Length") {
checkEvaluation(Length(Literal(null, IntegerType)), null)
checkEvaluation(Length(Literal(0, IntegerType)), 1)
checkEvaluation(Length(Literal(12, IntegerType)), 2)
checkEvaluation(Length(Literal(123, IntegerType)), 3)
checkEvaluation(Length(Literal(12.4F, FloatType)), 4)
checkEvaluation(Length(Literal(12345678901L, LongType)), 11)
checkEvaluation(Length(Literal(1234567890.2D, DoubleType)), 14)
checkEvaluation(Length(Literal("1234567890ABC", StringType)), 13)
checkEvaluation(Length(Literal("\uF93D\uF936\uF949\uF942", StringType)), 4)
}

test("OctetLen") {
checkEvaluation(OctetLength(Literal(null, StringType), "ISO-8859-1"), null)
checkEvaluation(OctetLength(Literal(null, StringType), "UTF-8"), null)
checkEvaluation(OctetLength(Literal(null, StringType), "UTF-16"), null)
checkEvaluation(OctetLength(Literal("1234567890ABC", StringType), "ISO-8859-1"), 13)
checkEvaluation(OctetLength(Literal("1234567890ABC", StringType), "UTF-8"), 13)
checkEvaluation(OctetLength(Literal("1234567890ABC", StringType), "UTF-16"), 28)
checkEvaluation(OctetLength(Literal("1234567890ABC", StringType), "UTF-32"), 52)
checkEvaluation(OctetLength(
Literal("\uF93D\uF936\uF949\uF942", StringType), "ISO-8859-1"), 4)
// Chinese characters get truncated by ISO-8859-1 encoding
checkEvaluation(OctetLength(
Literal("\uF93D\uF936\uF949\uF942", StringType), "UTF-8"), 12) // chinese characters
checkEvaluation(OctetLength(
Literal("\uD840\uDC0B\uD842\uDFB7", StringType), "UTF-8"), 8) // 2 surrogate pairs
checkEvaluation(OctetLength(
Literal("\uF93D\uF936\uF949\uF942", StringType), "UTF-16"), 10) // chinese characters
checkEvaluation(OctetLength(
Literal("\uD840\uDC0B\uD842\uDFB7", StringType), "UTF-16"), 10) // 2 surrogate pairs
checkEvaluation(OctetLength(
Literal("\uF93D\uF936\uF949\uF942", StringType), "UTF-32"), 16) // chinese characters
checkEvaluation(OctetLength(
Literal("\uD840\uDC0B\uD842\uDFB7", StringType), "UTF-32"), 8) // 2 surrogate pairs
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ class ConstantFoldingSuite extends PlanTest {
Substring("abc", 0, Literal(null, IntegerType)) as 'c18,

Contains(Literal(null, StringType), "abc") as 'c19,
Contains("abc", Literal(null, StringType)) as 'c20
Contains("abc", Literal(null, StringType)) as 'c20,

Length(Literal(null, IntegerType)) as 'c21,
OctetLength(Literal(null, StringType), "ISO-8859-1") as 'c22

)

val optimized = Optimize(originalQuery.analyze)
Expand Down Expand Up @@ -243,7 +247,11 @@ class ConstantFoldingSuite extends PlanTest {
Literal(null, StringType) as 'c18,

Literal(null, BooleanType) as 'c19,
Literal(null, BooleanType) as 'c20
Literal(null, BooleanType) as 'c20,

Literal(null, IntegerType) as 'c21,
Literal(null, IntegerType) as 'c22

).analyze

comparePlans(optimized, correctAnswer)
Expand Down
2 changes: 2 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import java.io.{PrintWriter, StringWriter}

import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._

Expand Down
28 changes: 28 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ class SQLQuerySuite extends QueryTest {
checkAnswer(
sql("SELECT substring(tableName, 3) FROM tableName"),
"st")
checkAnswer(
sql("SELECT substring(tableName, 2) FROM tableName group by substring(tableName, 2)"),
"est")
}

test("SPARK-2686 Added Parser of SQL LENGTH()") {
checkAnswer(
sql("SELECT char_len(key) as keylen from testData where key = 100"), 3)
checkAnswer(
sql("SELECT len(key), count(*) as cnt from testData where key <= 100 group by len(key)"),
Seq(Seq(1,9),Seq(2,90), Seq(3,1)))
checkAnswer(
sql("SELECT max(length(key * key) - len(key)) from testData where key <= 100"), 2)
checkAnswer(
sql("SELECT min(Length(s)) FROM nullableRepeatedData where s is not null"), 4)
checkAnswer(
sql("SELECT max(LENGTH(s)) FROM nullableRepeatedData"), 4)
}

test("SPARK-2686 Added Parser of SQL OCTET_LEN()") {
checkAnswer(
sql("SELECT octet_len(s) from repeatedData"), Seq(Seq(4),Seq(4)))
checkAnswer(
sql("SELECT octet_len(s,'UTF-8') from repeatedData"), Seq(Seq(4),Seq(4)))
checkAnswer(
sql("SELECT max(octet_len(s,'UTF-8')) from nullStrings"), 3)
checkAnswer(
sql("SELECT octet_len('a','ISO-8859-1') + octet_len('abcde','ISO-8859-1') FROM testData limit 1"), 6)
}

test("index into array") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,8 @@ private[hive] object HiveQl {
val WHEN = "(?i)WHEN".r
val CASE = "(?i)CASE".r
val SUBSTR = "(?i)SUBSTR(?:ING)?".r
val CHAR_LEN = "(?i)CHAR_LEN".r
val OCTET_LEN = "(?i)OCTET_LEN".r

protected def nodeToExpr(node: Node): Expression = node match {
/* Attribute References */
Expand Down Expand Up @@ -997,6 +999,11 @@ private[hive] object HiveQl {
Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType))
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))
case Token("TOK_FUNCTION", Token(OCTET_LEN(), Nil) :: string :: Nil) =>
OctetLength(nodeToExpr(string), Literal(OctetLengthConstants.DefaultEncoding))
case Token("TOK_FUNCTION", Token(OCTET_LEN(), Nil) :: string :: encoding :: Nil) =>
OctetLength(nodeToExpr(string), nodeToExpr(encoding))


/* UDFs - Must be last otherwise will preempt built in functions */
case Token("TOK_FUNCTION", Token(name, Nil) :: args) =>
Expand Down