Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ trait CheckAnalysis extends PredicateHelper {
}
}

private def getNumInputFileBlockSources(operator: LogicalPlan): Int = {
operator match {
case _: LeafNode => 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we only consider file data source leaf node?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unable to check it in CheckAnalysis. Both HadoopRDD and FileScanRDD have the same issues. To block both, we need to add the check as another rule.

// UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
case u: Union => u.children.map(getNumInputFileBlockSources).sum - u.children.length + 1
case o => o.children.map(getNumInputFileBlockSources).sum
}
}

def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
Expand All @@ -100,6 +109,10 @@ trait CheckAnalysis extends PredicateHelper {
failAnalysis(
s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")

case e @ (_: InputFileName | _: InputFileBlockLength | _: InputFileBlockStart)
if getNumInputFileBlockSources(operator) > 1 =>
e.failAnalysis(s"'${e.prettyName}' does not support more than one sources")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one sources -> one source.


case g: Grouping =>
failAnalysis("grouping() can only be used with GroupingSets/Cube/Rollup")
case g: GroupingID =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,45 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
)
}

test("input_file_name, input_file_block_start, input_file_block_length - more than one sources") {
withTable("tab1", "tab2") {
val data = sparkContext.parallelize(0 to 9).toDF("id")
data.write.saveAsTable("tab1")
data.write.saveAsTable("tab2")
Seq("input_file_name", "input_file_block_start", "input_file_block_length").foreach { func =>
val e = intercept[AnalysisException] {
sql(s"SELECT *, $func() FROM tab1 JOIN tab2 ON tab1.id = tab2.id")
}.getMessage
assert(e.contains(s"'$func' does not support more than one sources"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

val df = sql(
"""
|SELECT *, input_file_name()
|FROM (SELECT * FROM tab1 UNION ALL SELECT * FROM tab2 UNION ALL SELECT * FROM tab2)
""".stripMargin)
assert(df.count() == 30)

var e = intercept[AnalysisException] {
sql(
"""
|SELECT *, input_file_name()
|FROM (SELECT * FROM tab1 NATURAL JOIN tab2) UNION ALL SELECT * FROM tab2
""".stripMargin)
}.getMessage
assert(e.contains("'input_file_name' does not support more than one sources"))

e = intercept[AnalysisException] {
sql(
"""
|SELECT *, input_file_name()
|FROM (SELECT * FROM tab1 UNION ALL SELECT * FROM tab2) NATURAL JOIN tab2
""".stripMargin)
}.getMessage
assert(e.contains("'input_file_name' does not support more than one sources"))
}
}

test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10).toDF("id")
Expand Down