@@ -2864,13 +2864,13 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id I
28642864----
28652865physical_plan
2866286601)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
2867- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2868- 03)----CoalesceBatchesExec: target_batch_size=2
2869- 04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
2870- 05)--------CoalesceBatchesExec: target_batch_size =2
2871- 06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
2872- 07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2873- 08)--------------MemoryExec: partitions=1, partition_sizes=[1 ]
2867+ 02)--CoalesceBatchesExec: target_batch_size=2
2868+ 03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
2869+ 04)------CoalesceBatchesExec: target_batch_size=2
2870+ 05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions =2
2871+ 06)----------RepartitionExec: partitioning=RoundRobinBatch( 2), input_partitions=1
2872+ 07)------------MemoryExec: partitions=1, partition_sizes=[1]
2873+ 08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
2874287409)--------CoalesceBatchesExec: target_batch_size=2
2875287510)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
2876287611)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -2905,13 +2905,13 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
29052905----
29062906physical_plan
2907290701)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
2908- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2909- 03)----CoalesceBatchesExec: target_batch_size=2
2910- 04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
2911- 05)--------CoalesceBatchesExec: target_batch_size =2
2912- 06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
2913- 07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2914- 08)--------------MemoryExec: partitions=1, partition_sizes=[1 ]
2908+ 02)--CoalesceBatchesExec: target_batch_size=2
2909+ 03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
2910+ 04)------CoalesceBatchesExec: target_batch_size=2
2911+ 05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions =2
2912+ 06)----------RepartitionExec: partitioning=RoundRobinBatch( 2), input_partitions=1
2913+ 07)------------MemoryExec: partitions=1, partition_sizes=[1]
2914+ 08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
2915291509)--------CoalesceBatchesExec: target_batch_size=2
2916291610)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
2917291711)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -2967,10 +2967,10 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id I
29672967----
29682968physical_plan
2969296901)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
2970- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2971- 03)----CoalesceBatchesExec: target_batch_size=2
2972- 04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0) ]
2973- 05)--------MemoryExec: partitions=1, partition_sizes=[1 ]
2970+ 02)--CoalesceBatchesExec: target_batch_size=2
2971+ 03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
2972+ 04)------MemoryExec: partitions=1, partition_sizes=[1 ]
2973+ 05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
2974297406)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
2975297507)----------MemoryExec: partitions=1, partition_sizes=[1]
29762976
@@ -3003,10 +3003,10 @@ explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
30033003----
30043004physical_plan
3005300501)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
3006- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3007- 03)----CoalesceBatchesExec: target_batch_size=2
3008- 04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0) ]
3009- 05)--------MemoryExec: partitions=1, partition_sizes=[1 ]
3006+ 02)--CoalesceBatchesExec: target_batch_size=2
3007+ 03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
3008+ 04)------MemoryExec: partitions=1, partition_sizes=[1 ]
3009+ 05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
3010301006)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3011301107)----------MemoryExec: partitions=1, partition_sizes=[1]
30123012
@@ -3061,13 +3061,13 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHER
30613061----
30623062physical_plan
3063306301)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
3064- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3065- 03)----CoalesceBatchesExec: target_batch_size=2
3066- 04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
3067- 05)--------CoalesceBatchesExec: target_batch_size =2
3068- 06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
3069- 07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3070- 08)--------------MemoryExec: partitions=1, partition_sizes=[1 ]
3064+ 02)--CoalesceBatchesExec: target_batch_size=2
3065+ 03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
3066+ 04)------CoalesceBatchesExec: target_batch_size=2
3067+ 05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions =2
3068+ 06)----------RepartitionExec: partitioning=RoundRobinBatch( 2), input_partitions=1
3069+ 07)------------MemoryExec: partitions=1, partition_sizes=[1]
3070+ 08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
3071307109)--------CoalesceBatchesExec: target_batch_size=2
3072307210)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
3073307311)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -3083,13 +3083,13 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGH
30833083----
30843084physical_plan
3085308501)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
3086- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3087- 03)----CoalesceBatchesExec: target_batch_size=2
3088- 04)------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
3089- 05)--------CoalesceBatchesExec: target_batch_size =2
3090- 06)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
3091- 07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3092- 08)--------------MemoryExec: partitions=1, partition_sizes=[1 ]
3086+ 02)--CoalesceBatchesExec: target_batch_size=2
3087+ 03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
3088+ 04)------CoalesceBatchesExec: target_batch_size=2
3089+ 05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions =2
3090+ 06)----------RepartitionExec: partitioning=RoundRobinBatch( 2), input_partitions=1
3091+ 07)------------MemoryExec: partitions=1, partition_sizes=[1]
3092+ 08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
3093309309)--------CoalesceBatchesExec: target_batch_size=2
3094309410)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
3095309511)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -3143,10 +3143,10 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHER
31433143----
31443144physical_plan
3145314501)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
3146- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3147- 03)----CoalesceBatchesExec: target_batch_size=2
3148- 04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
3149- 05)--------MemoryExec: partitions=1, partition_sizes=[1 ]
3146+ 02)--CoalesceBatchesExec: target_batch_size=2
3147+ 03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
3148+ 04)------MemoryExec: partitions=1, partition_sizes=[1]
3149+ 05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
3150315006)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3151315107)----------MemoryExec: partitions=1, partition_sizes=[1]
31523152
@@ -3160,10 +3160,10 @@ explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGH
31603160----
31613161physical_plan
3162316201)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
3163- 02)--SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3164- 03)----CoalesceBatchesExec: target_batch_size=2
3165- 04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
3166- 05)--------MemoryExec: partitions=1, partition_sizes=[1 ]
3163+ 02)--CoalesceBatchesExec: target_batch_size=2
3164+ 03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
3165+ 04)------MemoryExec: partitions=1, partition_sizes=[1]
3166+ 05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true ]
3167316706)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
3168316807)----------MemoryExec: partitions=1, partition_sizes=[1]
31693169
@@ -4313,3 +4313,86 @@ physical_plan
4313431304)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)]
4314431405)--------MemoryExec: partitions=1, partition_sizes=[1]
4315431506)--------MemoryExec: partitions=1, partition_sizes=[1]
4316+
4317+ # Test hash join sort push down
4318+ # Issue: https://github.com/apache/datafusion/issues/13559
4319+ statement ok
4320+ CREATE TABLE test(a INT, b INT, c INT)
4321+
4322+ statement ok
4323+ insert into test values (1,2,3), (4,5,6), (null, 7, 8), (8, null, 9), (9, 10, null)
4324+
4325+ statement ok
4326+ set datafusion.execution.target_partitions = 2;
4327+
4328+ query TT
4329+ explain select * from test where a in (select a from test where b > 3) order by c desc nulls first;
4330+ ----
4331+ logical_plan
4332+ 01)Sort: test.c DESC NULLS FIRST
4333+ 02)--LeftSemi Join: test.a = __correlated_sq_1.a
4334+ 03)----TableScan: test projection=[a, b, c]
4335+ 04)----SubqueryAlias: __correlated_sq_1
4336+ 05)------Projection: test.a
4337+ 06)--------Filter: test.b > Int32(3)
4338+ 07)----------TableScan: test projection=[a, b]
4339+ physical_plan
4340+ 01)SortPreservingMergeExec: [c@2 DESC]
4341+ 02)--CoalesceBatchesExec: target_batch_size=3
4342+ 03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
4343+ 04)------CoalesceBatchesExec: target_batch_size=3
4344+ 05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4345+ 06)----------CoalesceBatchesExec: target_batch_size=3
4346+ 07)------------FilterExec: b@1 > 3, projection=[a@0]
4347+ 08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4348+ 09)----------------MemoryExec: partitions=1, partition_sizes=[1]
4349+ 10)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
4350+ 11)--------CoalesceBatchesExec: target_batch_size=3
4351+ 12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4352+ 13)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4353+ 14)--------------MemoryExec: partitions=1, partition_sizes=[1]
4354+
4355+ query TT
4356+ explain select * from test where a in (select a from test where b > 3) order by c desc nulls last;
4357+ ----
4358+ logical_plan
4359+ 01)Sort: test.c DESC NULLS LAST
4360+ 02)--LeftSemi Join: test.a = __correlated_sq_1.a
4361+ 03)----TableScan: test projection=[a, b, c]
4362+ 04)----SubqueryAlias: __correlated_sq_1
4363+ 05)------Projection: test.a
4364+ 06)--------Filter: test.b > Int32(3)
4365+ 07)----------TableScan: test projection=[a, b]
4366+ physical_plan
4367+ 01)SortPreservingMergeExec: [c@2 DESC NULLS LAST]
4368+ 02)--CoalesceBatchesExec: target_batch_size=3
4369+ 03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
4370+ 04)------CoalesceBatchesExec: target_batch_size=3
4371+ 05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4372+ 06)----------CoalesceBatchesExec: target_batch_size=3
4373+ 07)------------FilterExec: b@1 > 3, projection=[a@0]
4374+ 08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4375+ 09)----------------MemoryExec: partitions=1, partition_sizes=[1]
4376+ 10)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
4377+ 11)--------CoalesceBatchesExec: target_batch_size=3
4378+ 12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4379+ 13)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4380+ 14)--------------MemoryExec: partitions=1, partition_sizes=[1]
4381+
4382+ query III
4383+ select * from test where a in (select a from test where b > 3) order by c desc nulls first;
4384+ ----
4385+ 9 10 NULL
4386+ 4 5 6
4387+
4388+ query III
4389+ select * from test where a in (select a from test where b > 3) order by c desc nulls last;
4390+ ----
4391+ 4 5 6
4392+ 9 10 NULL
4393+
4394+ statement ok
4395+ DROP TABLE test
4396+
4397+ statement ok
4398+ set datafusion.execution.target_partitions = 1;
0 commit comments