Skip to content

Conversation

@AnthonyTruchet
Copy link

@AnthonyTruchet AnthonyTruchet commented Nov 18, 2016

What changes were proposed in this pull request?

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.

How was this patch tested?

Unit test for module mllib run locally for correctness.

As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory.

CostFun used to send a dense vector of zeroes as a closure in a
treeAggregate call. To avoid that, we replace treeAggregate by
mapPartition + treeReduce, creating a zero vector inside the mapPartition
block in-place.
@AnthonyTruchet
Copy link
Author

NB Replaces #7

@tibidoh
Copy link

tibidoh commented Nov 18, 2016

I think we absolutely need a performance benchmark before pushing upstream

@AnthonyTruchet
Copy link
Author

Actually we did one locally and it was very much in favour of the new version. I'll update the PR message to state this.

@AnthonyTruchet AnthonyTruchet merged commit 011a8d7 into criteo-forks:master Nov 21, 2016
@AnthonyTruchet AnthonyTruchet deleted the ENG-17719-lbfgs-only branch November 21, 2016 12:26
2ooom pushed a commit that referenced this pull request Jul 30, 2017
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <[email protected]>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
2ooom pushed a commit that referenced this pull request Jul 30, 2017
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <[email protected]>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
2ooom pushed a commit that referenced this pull request Jul 31, 2017
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <[email protected]>

Closes apache#18583 from aokolnychyi/spark-21332.
Willymontaz pushed a commit to Willymontaz/spark that referenced this pull request Nov 8, 2018
…/`to_avro`

## What changes were proposed in this pull request?

Previously in from_avro/to_avro, we override the method `simpleString` and `sql` for the string output. However, the override only affects the alias naming:
```
Project [from_avro('col,
...
, (mode,PERMISSIVE)) AS from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))criteo-forks#11]
```
It only makes the alias name quite long: `from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))`).

We should follow `from_csv`/`from_json` here, to override the method prettyName only, and we will get a clean alias name

```
... AS from_avro(col)criteo-forks#11
```

## How was this patch tested?

Manual check

Closes apache#22890 from gengliangwang/revise_from_to_avro.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
Willymontaz pushed a commit that referenced this pull request Apr 1, 2019
…from_avro`/`to_avro`

Back port apache#22890 to branch-2.4.
It is a bug fix for this issue:
https://issues.apache.org/jira/browse/SPARK-26063

## What changes were proposed in this pull request?

Previously in from_avro/to_avro, we override the method `simpleString` and `sql` for the string output. However, the override only affects the alias naming:
```
Project [from_avro('col,
...
, (mode,PERMISSIVE)) AS from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))#11]
```
It only makes the alias name quite long: `from_avro(col, struct<col1:bigint,col2:double>, Map(mode -> PERMISSIVE))`).

We should follow `from_csv`/`from_json` here, to override the method prettyName only, and we will get a clean alias name

```
... AS from_avro(col)#11
```

## How was this patch tested?

Manual check

Closes apache#23047 from gengliangwang/backport_avro_pretty_name.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
Willymontaz pushed a commit to Willymontaz/spark that referenced this pull request Apr 2, 2019
…pressions

## What changes were proposed in this pull request?

This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:

```
    val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
    val sc = spark.sparkContext
    val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
    val df = spark.createDataFrame(rdd, inputSchema)

    // Works correctly since no nested decimal expression is involved
    // Expected result type: (26, 6) * (26, 6) = (38, 12)
    df.select($"col" * $"col").explain(true)
    df.select($"col" * $"col").printSchema()

    // Gives a wrong result since there is a nested decimal expression that should be visited first
    // Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
    df.select($"col" * $"col" * $"col").explain(true)
    df.select($"col" * $"col" * $"col").printSchema()
```

The example above gives the following output:

```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)criteo-forks#4]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)criteo-forks#4]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)criteo-forks#4]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)criteo-forks#4]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- (col * col): decimal(38,12) (nullable = true)

// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)criteo-forks#11]
+- LogicalRDD [col#1]

== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)criteo-forks#11]
+- LogicalRDD [col#1]

== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)criteo-forks#11]
+- LogicalRDD [col#1]

== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)criteo-forks#11]
+- Scan ExistingRDD[col#1]

// Schema
root
 |-- ((col * col) * col): decimal(38,12) (nullable = true)
```

## How was this patch tested?

This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.

Author: aokolnychyi <[email protected]>

Closes apache#18583 from aokolnychyi/spark-21332.

(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
jetoile pushed a commit that referenced this pull request Oct 18, 2024
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id)                     '[#12]Join LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)                   +- '[#11]Project ['index, 'value_2]
!      :- '[#7]UnresolvedRelation [test_table_1], [], false      +- '[#10]Join Inner, '`==`('id, 'index)
!      +- '[#8]UnresolvedRelation [test_table_2], [], false         :- '[#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
jetoile pushed a commit that referenced this pull request Nov 4, 2025
…erfile` building

### What changes were proposed in this pull request?

This PR aims to add `libwebp-dev` to recover `spark-rm/Dockerfile` building.

### Why are the changes needed?

`Apache Spark` release docker image compilation has been broken for last 7 days due to the SparkR package compilation.
- https://github.com/apache/spark/actions/workflows/release.yml
    - https://github.com/apache/spark/actions/runs/17425825244

```
#11 559.4 No package 'libwebpmux' found
...
#11 559.4 -------------------------- [ERROR MESSAGE] ---------------------------
#11 559.4 <stdin>:1:10: fatal error: ft2build.h: No such file or directory
#11 559.4 compilation terminated.
#11 559.4 --------------------------------------------------------------------
#11 559.4 ERROR: configuration failed for package 'ragg'
```

### Does this PR introduce _any_ user-facing change?

No, this is a fix for Apache Spark release tool.

### How was this patch tested?

Manually build.

```
$ cd dev/create-release/spark-rm
$ docker build .
```

**BEFORE**

```
...
Dockerfile:83
--------------------
  82 |     # See more in SPARK-39959, roxygen2 < 7.2.1
  83 | >>> RUN Rscript -e "install.packages(c('devtools', 'knitr', 'markdown',  \
  84 | >>>     'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow',  \
  85 | >>>     'ggplot2', 'mvtnorm', 'statmod', 'xml2'), repos='https://cloud.r-project.org/')" && \
  86 | >>>     Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='https://cloud.r-project.org')" && \
  87 | >>>     Rscript -e "devtools::install_version('lintr', version='2.0.1', repos='https://cloud.r-project.org')" && \
  88 | >>>     Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')" && \
  89 | >>>     Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')"
  90 |
--------------------
ERROR: failed to build: failed to solve:
```

**AFTER**
```
...
 => [ 6/22] RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/'                                                             3.8s
 => [ 7/22] RUN Rscript -e "install.packages(c('devtools', 'knitr', 'markdown',      'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow',       892.2s
 => [ 8/22] RUN add-apt-repository ppa:pypy/ppa                                                                                                                15.3s
...
```

After merging this PR, we can validate via the daily release dry-run CI.

- https://github.com/apache/spark/actions/workflows/release.yml

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52340 from peter-toth/SPARK-53539-3.5.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants