diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 52be631d94d8..8398fb8d1e83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -392,12 +392,24 @@ object DeduplicateRelations extends Rule[LogicalPlan] { newVersion.copyTagsFrom(oldVersion) Seq((oldVersion, newVersion)) + case oldVersion @ FlatMapGroupsInArrow(_, _, output, _) + if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => + val newVersion = oldVersion.copy(output = output.map(_.newInstance())) + newVersion.copyTagsFrom(oldVersion) + Seq((oldVersion, newVersion)) + case oldVersion @ FlatMapCoGroupsInPandas(_, _, _, output, _, _) if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => val newVersion = oldVersion.copy(output = output.map(_.newInstance())) newVersion.copyTagsFrom(oldVersion) Seq((oldVersion, newVersion)) + case oldVersion @ FlatMapCoGroupsInArrow(_, _, _, output, _, _) + if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => + val newVersion = oldVersion.copy(output = output.map(_.newInstance())) + newVersion.copyTagsFrom(oldVersion) + Seq((oldVersion, newVersion)) + case oldVersion @ MapInPandas(_, output, _, _, _) if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => val newVersion = oldVersion.copy(output = output.map(_.newInstance()))