Skip to content

Commit eb134b9

Browse files
committed
Push down In filter to Parquet.
1 parent 4d00ed0 commit eb134b9

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ private[sql] object ParquetFilters {
201201
}
202202

203203
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
204+
case BooleanType =>
205+
(n: String, v: Set[Any]) =>
206+
FilterApi.userDefined(booleanColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Boolean]]))
204207
case IntegerType =>
205208
(n: String, v: Set[Any]) =>
206209
FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
@@ -252,6 +255,9 @@ private[sql] object ParquetFilters {
252255
case sources.IsNotNull(name) =>
253256
makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
254257

258+
case sources.In(name, values) =>
259+
makeInSet.lift(dataTypeOf(name)).map(_(name, values.toSet))
260+
255261
case sources.EqualTo(name, value) =>
256262
makeEq.lift(dataTypeOf(name)).map(_(name, value))
257263
case sources.Not(sources.EqualTo(name, value)) =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
112112
checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
113113
checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true)
114114
checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
115+
116+
checkFilterPredicate(('_1 in(true)).asInstanceOf[Predicate],
117+
classOf[UserDefinedByInstance[_, _]], true)
118+
checkFilterPredicate(('_1 in(false)).asInstanceOf[Predicate],
119+
classOf[UserDefinedByInstance[_, _]], false)
115120
}
116121
}
117122

@@ -138,6 +143,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
138143

139144
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
140145
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
146+
147+
checkFilterPredicate(('_1 in(1, 2)).asInstanceOf[Predicate],
148+
classOf[UserDefinedByInstance[_, _]], Seq(Row(1), Row(2)))
149+
checkFilterPredicate(('_1 in(3, 4)).asInstanceOf[Predicate],
150+
classOf[UserDefinedByInstance[_, _]], Seq(Row(3), Row(4)))
141151
}
142152
}
143153

@@ -164,6 +174,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
164174

165175
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
166176
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
177+
178+
checkFilterPredicate(('_1 in(1L, 2L)).asInstanceOf[Predicate],
179+
classOf[UserDefinedByInstance[_, _]], Seq(Row(1L), Row(2L)))
180+
checkFilterPredicate(('_1 in(3L, 4L)).asInstanceOf[Predicate],
181+
classOf[UserDefinedByInstance[_, _]], Seq(Row(3L), Row(4L)))
167182
}
168183
}
169184

@@ -190,6 +205,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
190205

191206
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
192207
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
208+
209+
checkFilterPredicate(('_1 in(1.0f, 2.0f)).asInstanceOf[Predicate],
210+
classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0f), Row(2.0f)))
211+
checkFilterPredicate(('_1 in(3.0f, 4.0f)).asInstanceOf[Predicate],
212+
classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0f), Row(4.0f)))
193213
}
194214
}
195215

@@ -216,6 +236,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
216236

217237
checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
218238
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
239+
240+
checkFilterPredicate(('_1 in(1.0, 2.0)).asInstanceOf[Predicate],
241+
classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0), Row(2.0)))
242+
checkFilterPredicate(('_1 in(3.0, 4.0)).asInstanceOf[Predicate],
243+
classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0), Row(4.0)))
219244
}
220245
}
221246

@@ -244,6 +269,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
244269

245270
checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
246271
checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
272+
273+
checkFilterPredicate(('_1 in("1", "2")).asInstanceOf[Predicate],
274+
classOf[UserDefinedByInstance[_, _]], Seq(Row("1"), Row("2")))
275+
checkFilterPredicate(('_1 in("3", "4")).asInstanceOf[Predicate],
276+
classOf[UserDefinedByInstance[_, _]], Seq(Row("3"), Row("4")))
247277
}
248278

249279
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString * 5 + "test"))) { implicit df =>
@@ -298,6 +328,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
298328
checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
299329
checkBinaryFilterPredicate(
300330
'_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
331+
332+
checkFilterPredicate(('_1 in(1.b, 2.b)).asInstanceOf[Predicate],
333+
classOf[UserDefinedByInstance[_, _]], Seq(Row(1.b), Row(2.b)))
334+
checkFilterPredicate(('_1 in(3.b, 4.b)).asInstanceOf[Predicate],
335+
classOf[UserDefinedByInstance[_, _]], Seq(Row(3.b), Row(4.b)))
301336
}
302337
}
303338

0 commit comments

Comments
 (0)