From 2b45129def7d190a3949c31b9341e071b63526bc Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 20 Aug 2017 08:41:54 +0800 Subject: [PATCH 1/4] Get metadata from RowDataSourceScanExec --- .../spark/sql/jdbc/OracleIntegrationSuite.scala | 13 +++++++++++++ .../org/apache/spark/sql/test/SQLTestUtils.scala | 1 - 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 80a129a9e032..231550d3859a 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.util.Properties import java.math.BigDecimal import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.{RowDataSourceScanExec, DataSourceScanExec} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -255,6 +256,18 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo val df = dfRead.filter(dfRead.col("date_type").lt(dt)) .filter(dfRead.col("timestamp_type").lt(ts)) + val parentPlan = df.queryExecution.executedPlan + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] + val metadata = node.child.asInstanceOf[RowDataSourceScanExec].metadata + // The "PushedFilters" part should be exist in Datafrome's + // physical plan and the existence of right literals in + // "PushedFilters" is used to prove that the predicates + // pushing down have been effective. + assert(metadata.get("PushedFilters").ne(None)) + assert(metadata("PushedFilters").contains(dt.toString)) + assert(metadata("PushedFilters").contains(ts.toString)) + val row = df.collect()(0) assert(row.getDate(0).equals(dateVal)) assert(row.getTimestamp(1).equals(timestampVal)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index e68db3b636bc..692379b49596 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.FilterExec -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{UninterruptibleThread, Utils} /** From 71182ddcb16d8550bdbe2d946c451a47cea79af8 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 20 Aug 2017 08:55:41 +0800 Subject: [PATCH 2/4] Remove unused import --- .../org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 231550d3859a..ecfedf4d935b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -22,7 +22,7 @@ import java.util.Properties import java.math.BigDecimal import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.{RowDataSourceScanExec, DataSourceScanExec} +import org.apache.spark.sql.execution.{WholeStageCodegenExec, RowDataSourceScanExec} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -257,8 +257,8 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo .filter(dfRead.col("timestamp_type").lt(ts)) val parentPlan = df.queryExecution.executedPlan - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) - val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] + assert(parentPlan.isInstanceOf[WholeStageCodegenExec]) + val node = parentPlan.asInstanceOf[WholeStageCodegenExec] val metadata = node.child.asInstanceOf[RowDataSourceScanExec].metadata // The "PushedFilters" part should be exist in Datafrome's // physical plan and the existence of right literals in From da88a853070c369b1dae79d1357992dc8840e20a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 20 Aug 2017 13:08:39 +0800 Subject: [PATCH 3/4] Fix typo. --- .../org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index ecfedf4d935b..686f1e77335d 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -260,7 +260,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(parentPlan.isInstanceOf[WholeStageCodegenExec]) val node = parentPlan.asInstanceOf[WholeStageCodegenExec] val metadata = node.child.asInstanceOf[RowDataSourceScanExec].metadata - // The "PushedFilters" part should be exist in Datafrome's + // The "PushedFilters" part should be exist in Dataframe's // physical plan and the existence of right literals in // "PushedFilters" is used to prove that the predicates // pushing down have been effective. From b24aedf8859283d1520d5eae195d21722972591a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 21 Aug 2017 12:50:26 +0800 Subject: [PATCH 4/4] isDefined instead of ne(None). --- .../org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala | 4 ++-- .../test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 686f1e77335d..1b2c1b9e800a 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -260,11 +260,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(parentPlan.isInstanceOf[WholeStageCodegenExec]) val node = parentPlan.asInstanceOf[WholeStageCodegenExec] val metadata = node.child.asInstanceOf[RowDataSourceScanExec].metadata - // The "PushedFilters" part should be exist in Dataframe's + // The "PushedFilters" part should exist in Dataframe's // physical plan and the existence of right literals in // "PushedFilters" is used to prove that the predicates // pushing down have been effective. - assert(metadata.get("PushedFilters").ne(None)) + assert(metadata.get("PushedFilters").isDefined) assert(metadata("PushedFilters").contains(dt.toString)) assert(metadata("PushedFilters").contains(ts.toString)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 692379b49596..e68db3b636bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.FilterExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{UninterruptibleThread, Utils} /**