Skip to content

Conversation

@drexler-sky
Copy link
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

adds support for array_distinct

What changes are included in this PR?

How are these changes tested?

new test case

@codecov-commenter
Copy link

codecov-commenter commented Jun 23, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.85%. Comparing base (f09f8af) to head (d8c0b1e).
Report is 285 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1923      +/-   ##
============================================
+ Coverage     56.12%   58.85%   +2.72%     
- Complexity      976     1141     +165     
============================================
  Files           119      130      +11     
  Lines         11743    12858    +1115     
  Branches       2251     2393     +142     
============================================
+ Hits           6591     7567     +976     
- Misses         4012     4073      +61     
- Partials       1140     1218      +78     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member

The CI test failure is unrelated to changes in this PR and is now fixed in main branch

}
}

test("array_distinct") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a case with nulls(more than one) in the array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. While I was testing the nulls, I found out that datafusion's array_distinct doesn't behave the same as spark's array_distinct. This is because datafusion first sorts then removes duplicates while spark preserves the original order. Therefore I changed the code to implement IncompatExpr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add array_distinct to the list of supported array expressions in docs/source/user-guide/expressions.md and add a note about the compatibility issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@drexler-sky
Copy link
Contributor Author

@andygrove @parthchandra @comphead Could you please take another look? The CI failure doesn't seem to be related to this PR.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @drexler-sky for the nice contribution
I think it is LGTM, 1 more thing please add tests with empty array.

Sometimes DataFusion and Spark treats differently array functions with empty array

@drexler-sky
Copy link
Contributor Author

...1 more thing please add tests with empty array.

I tested array_distinct with an empty array.

SELECT array_distinct(array()) FROM t1;

== Optimized Logical Plan ==
Project [[] AS array_distinct(array())#240]
+- Relation [_1#121,_2#122,_3#123,_4#124,_5#125L,_6#126,_7#127,_8#128,_9#129,_10#130,_11#131L,_12#132,_13#133,_14#134,_15#135,_16#136,_17#137,_18#138,_19#139,_20#140,_21#141,_id#142] parquet

== Physical Plan ==
*(1) Project [[] AS array_distinct(array())#240]
+- *(1) CometColumnarToRow
   +- CometScan parquet [] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2r/znvj4hhd3t1cp22pmw4m3h_40000gn/T/spark-f9..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Spark uses an alias, [] AS array_distinct(array()) , so it doesn't reach case _: ArrayDistinct => convert(CometArrayDistinct

@andygrove
Copy link
Member

...1 more thing please add tests with empty array.

I tested array_distinct with an empty array.

SELECT array_distinct(array()) FROM t1;

== Optimized Logical Plan ==
Project [[] AS array_distinct(array())#240]
+- Relation [_1#121,_2#122,_3#123,_4#124,_5#125L,_6#126,_7#127,_8#128,_9#129,_10#130,_11#131L,_12#132,_13#133,_14#134,_15#135,_16#136,_17#137,_18#138,_19#139,_20#140,_21#141,_id#142] parquet

== Physical Plan ==
*(1) Project [[] AS array_distinct(array())#240]
+- *(1) CometColumnarToRow
   +- CometScan parquet [] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2r/znvj4hhd3t1cp22pmw4m3h_40000gn/T/spark-f9..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Spark uses an alias, [] AS array_distinct(array()) , so it doesn't reach case _: ArrayDistinct => convert(CometArrayDistinct

In this case, Spark is replacing the array_distinct expression with a literal at planning time. To test with an empty array you would need to force this to happen at query execution time. You can do this using a CASE WHEN expression, similar to other tests in this PR.

@drexler-sky
Copy link
Contributor Author

@andygrove Thanks for the suggestion! I have tried

       checkSparkAnswerAndOperator(spark.sql("""
            SELECT array_distinct(
              CASE WHEN _2 = _3
                  THEN array(_4)
                  ELSE array()
              END
            )
            FROM t1
          """))

However, Spark still appears to replace the second array_distinct with [].

== Physical Plan ==
*(1) Project [CASE WHEN (cast(_2#1 as smallint) = _3#2) THEN array_distinct(array(_4#3)) ELSE [] END AS array_distinct(CASE WHEN (_2 = _3) THEN array(_4) ELSE array() END)#44]
+- *(1) CometColumnarToRow
   +- CometScan parquet [_2#1,_3#2,_4#3] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/2r/znvj4hhd3t1cp22pmw4m3h_40000gn/T/spark-39..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_2:tinyint,_3:smallint,_4:int>

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. pending ci

@comphead
Copy link
Contributor

@drexler-sky what if

    "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding",

?

@drexler-sky
Copy link
Contributor Author

"spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding"

I tried this, but it didn't work for me.

@drexler-sky
Copy link
Contributor Author

I stepped into the code. The reason Comet falls back to Spark for the literal [] is that it goes to https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala#L865. Maybe we can log a separate issue to address the supported DataType problem for complex types.

@andygrove
Copy link
Member

andygrove commented Jun 26, 2025

I stepped into the code. The reason Comet falls back to Spark for the literal [] is that it goes to https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala#L865. Maybe we can log a separate issue to address the supported DataType problem for complex types.

That's a good find (cc @comphead)

edit: We do have issue #1929 for tracking this

@andygrove andygrove merged commit 235b69d into apache:main Jun 26, 2025
96 checks passed
@andygrove
Copy link
Member

Thanks @drexler-sky @comphead @parthchandra

coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants