@@ -24,7 +24,7 @@ import ai.chronon.online.SparkConversions
2424import ai .chronon .spark .Extensions ._
2525import ai .chronon .spark .JoinUtils ._
2626import org .apache .spark .sql
27- import org .apache .spark .sql .DataFrame
27+ import org .apache .spark .sql .{ Column , DataFrame }
2828import org .apache .spark .sql .functions ._
2929import org .apache .spark .util .sketch .BloomFilter
3030
@@ -48,17 +48,17 @@ import scala.util.{Failure, Success, Try}
4848case class CoveringSet (hashes : Seq [String ], rowCount : Long , isCovering : Boolean )
4949
5050object CoveringSet {
51- def toFilterExpression (coveringSets : Seq [CoveringSet ]): String = {
52- val coveringSetHashExpression = " (" +
53- coveringSets
54- .map { coveringSet =>
55- val hashes = coveringSet.hashes.map(" '" + _.trim + " '" ).mkString(" , " )
56- s " array( $hashes) "
57- }
58- .mkString(" , " ) +
59- " )"
6051
61- s " ( ${Constants .MatchedHashes } IS NULL ) OR ( ${Constants .MatchedHashes } NOT IN $coveringSetHashExpression ) "
52+ private def arraysEqualFunc (colName : String , expected : Seq [String ]): Column = {
53+ // Avoid direct comparison of arrays because Iceberg cannot support Array literals in filter pushdown.
54+ size(array_except(col(colName), typedLit(expected))) === 0 && size(col(colName)) === expected.size
55+ }
56+
57+ def toFilterCondition (coveringSets : Seq [CoveringSet ]): Column = {
58+ val excludedConditions = coveringSets.map { cs => arraysEqualFunc(Constants .MatchedHashes , cs.hashes) }
59+
60+ val disjunction = excludedConditions.reduceOption(_ || _).getOrElse(lit(false ))
61+ col(Constants .MatchedHashes ).isNull || ! disjunction
6262 }
6363}
6464
@@ -621,7 +621,7 @@ class Join(joinConf: api.Join,
621621 // this happens whether bootstrapParts is NULL for the JOIN and thus no metadata columns were created
622622 return Some (bootstrapDfWithStats)
623623 }
624- val filterExpr = CoveringSet .toFilterExpression (coveringSets)
624+ val filterExpr = CoveringSet .toFilterCondition (coveringSets)
625625 logger.info(s " Using covering set filter: $filterExpr" )
626626 val filteredDf = bootstrapDf.where(filterExpr)
627627 val filteredCount = filteredDf.count()
0 commit comments