Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, JoinType}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.types._

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
Expand Down Expand Up @@ -46,10 +46,22 @@ case class Generate(
child: LogicalPlan)
extends UnaryNode {

protected def generatorOutput: Seq[Attribute] =
alias
protected def generatorOutput: Seq[Attribute] = {
val output = alias
.map(a => generator.output.map(_.withQualifiers(a :: Nil)))
.getOrElse(generator.output)
if (join && outer) {
output.map {
case attr if !attr.resolved => attr
case attr if !attr.nullable =>
AttributeReference(
attr.name, attr.dataType, nullable = true)(attr.exprId, attr.qualifiers)
case attr => attr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty common pattern. We already have withNullability, which you could use in these cases if we move it up as an abstract method in Attribute and add a no-op version in UnresolvedAttribute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.
It is the same as withQualifiers, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly.

}
} else {
output
}
}

override def output =
if (join) child.output ++ generatorOutput else generatorOutput
Expand Down Expand Up @@ -81,11 +93,29 @@ case class Join(
condition: Option[Expression]) extends BinaryNode {

override def references = condition.map(_.references).getOrElse(Set.empty)
override def output = joinType match {
case LeftSemi =>
left.output
case _ =>
left.output ++ right.output
override def output = {
def nullabilize(output: Seq[Attribute]) = {
output.map {
case attr if !attr.resolved => attr
case attr if !attr.nullable =>
AttributeReference(
attr.name, attr.dataType, nullable = true)(attr.exprId, attr.qualifiers)
case attr => attr
}
}

joinType match {
case LeftSemi =>
left.output
case LeftOuter =>
left.output ++ nullabilize(right.output)
case RightOuter =>
nullabilize(left.output) ++ right.output
case FullOuter =>
nullabilize(left.output) ++ nullabilize(right.output)
case _ =>
left.output ++ right.output
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ case class Aggregate(
case a: AggregateExpression =>
ComputedAggregate(
a,
BindReferences.bindReference(a, childOutput).asInstanceOf[AggregateExpression],
AttributeReference(s"aggResult:$a", a.dataType, nullable = true)())
BindReferences.bindReference(a, childOutput),
AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
}
}.toArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection}
import org.apache.spark.sql.catalyst.expressions._

/**
* :: DeveloperApi ::
Expand All @@ -39,8 +39,21 @@ case class Generate(
child: SparkPlan)
extends UnaryNode {

protected def generatorOutput: Seq[Attribute] = {
if (join && outer) {
generator.output.map {
case attr if !attr.nullable =>
AttributeReference(
attr.name, attr.dataType, nullable = true)(attr.exprId, attr.qualifiers)
case attr => attr
}
} else {
generator.output
}
}

override def output =
if (join) child.output ++ generator.output else generator.output
if (join) child.output ++ generatorOutput else generatorOutput

override def execute() = {
if (join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,27 @@ case class BroadcastNestedLoopJoin(

override def otherCopyArgs = sqlContext :: Nil

def output = left.output ++ right.output
def output = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing: add override when implementing abstract methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll add it.

def nullabilize(output: Seq[Attribute]) = {
output.map {
case attr if !attr.nullable =>
AttributeReference(
attr.name, attr.dataType, nullable = true)(attr.exprId, attr.qualifiers)
case attr => attr
}
}

joinType match {
case LeftOuter =>
left.output ++ nullabilize(right.output)
case RightOuter =>
nullabilize(left.output) ++ right.output
case FullOuter =>
nullabilize(left.output) ++ nullabilize(right.output)
case _ =>
left.output ++ right.output
}
}

/** The Streamed Relation */
def left = streamed
Expand Down