From e5ad88e78af7df92a068997d222f8e1ab180146e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 14:55:37 +0100 Subject: [PATCH 01/14] use multithreading in basic operations --- NEWS.md | 3 +++ src/dataframe/dataframe.jl | 33 ++++++++++++++++++++++++++------- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/NEWS.md b/NEWS.md index 0ee2d62a8e..7a419433cb 100644 --- a/NEWS.md +++ b/NEWS.md @@ -39,6 +39,9 @@ These aspects of input data frames might affect the order of rows produced in the output ([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612), [#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) +* `DataFrame` constructor, `getindex`, `select`, `select!`, `transform`, `transform!`, + and `combine` functions now use multiple threads in selected operations + ([XXXX](XXXX)) # DataFrames v0.22 Release Notes diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 6b19a51f83..94d39b6a61 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -190,7 +190,8 @@ struct DataFrame <: AbstractDataFrame # we write into columns as we know that it is guaranteed # that it was freshly allocated in the outer constructor - for (i, col) in enumerate(columns) + Threads.@threads for i in eachindex(columns) + col = columns[i] # check for vectors first as they are most common if col isa AbstractRange columns[i] = collect(col) @@ -502,9 +503,18 @@ end throw(BoundsError(df, (row_inds, col_inds))) end selected_columns = index(df)[col_inds] - # Computing integer indices once for all columns is faster - selected_rows = T === Bool ? findall(row_inds) : row_inds - new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)[selected_columns]] + old_columns = _columns(df)[selected_columns] + new_columns = Vector{AbstractVector}(undef, length(old_columns)) + if length(new_columns) == 1 + new_columns[1] = only(old_columns)[row_inds] + elseif length(new_columns) > 1 + # Computing integer indices once for all columns is faster + selected_rows = T === Bool ? findall(row_inds) : row_inds + Threads.@threads for i in eachindex(new_columns) + new_columns[i] = old_columns[i][selected_rows] + end + end + return DataFrame(new_columns, Index(_names(df)[selected_columns]), copycols=false) end @@ -512,9 +522,18 @@ end @boundscheck if !checkindex(Bool, axes(df, 1), row_inds) throw(BoundsError(df, (row_inds, :))) end - # Computing integer indices once for all columns is faster - selected_rows = T === Bool ? findall(row_inds) : row_inds - new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)] + old_columns = _columns(df) + new_columns = Vector{AbstractVector}(undef, length(old_columns)) + if length(new_columns) == 1 + new_columns[1] = only(old_columns)[row_inds] + elseif length(new_columns) > 1 + # Computing integer indices once for all columns is faster + selected_rows = T === Bool ? findall(row_inds) : row_inds + Threads.@threads for i in eachindex(new_columns) + new_columns[i] = old_columns[i][selected_rows] + end + end + return DataFrame(new_columns, copy(index(df)), copycols=false) end From 9e05929d21c0b8bdefc552d269ebf91dbe16bf27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 16:36:17 +0100 Subject: [PATCH 02/14] switch to @spawn and add conditionw when threading is used --- src/dataframe/dataframe.jl | 86 ++++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 94d39b6a61..49bb95acba 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -190,22 +190,19 @@ struct DataFrame <: AbstractDataFrame # we write into columns as we know that it is guaranteed # that it was freshly allocated in the outer constructor - Threads.@threads for i in eachindex(columns) - col = columns[i] - # check for vectors first as they are most common - if col isa AbstractRange - columns[i] = collect(col) - elseif col isa AbstractVector - columns[i] = copycols ? copy(col) : col - elseif col isa Union{AbstractArray{<:Any, 0}, Ref} - x = col[] - columns[i] = fill!(Tables.allocatecolumn(typeof(x), len), x) + @static if VERSION >= 1.4 + if Threads.nthreads() > 1 && len >= 1_000_000 + @sync for i in eachindex(columns) + Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols) + end else - if col isa AbstractArray - throw(ArgumentError("adding AbstractArray other than AbstractVector " * - "as a column of a data frame is not allowed")) + for i in eachindex(columns) + columns[i] = _preprocess_column(columns[i], len, copycols) end - columns[i] = fill!(Tables.allocatecolumn(typeof(col), len), col) + end + else + for i in eachindex(columns) + columns[i] = _preprocess_column(columns[i], len, copycols) end end @@ -217,6 +214,21 @@ struct DataFrame <: AbstractDataFrame end end +function _preprocess_column(col, len, copycols) + # check for vectors first as they are most common + col isa AbstractRange && return collect(col) + col isa AbstractVector && copycols ? copy(col) : col + if col isa Union{AbstractArray{<:Any, 0}, Ref} + x = col[] + return fill!(Tables.allocatecolumn(typeof(x), len), x) + end + if col isa AbstractArray + throw(ArgumentError("adding AbstractArray other than AbstractVector " * + "as a column of a data frame is not allowed")) + end + return fill!(Tables.allocatecolumn(typeof(col), len), col) +end + DataFrame(df::DataFrame; copycols::Bool=true) = copy(df, copycols=copycols) function DataFrame(pairs::Pair{Symbol, <:Any}...; makeunique::Bool=false, @@ -505,50 +517,44 @@ end selected_columns = index(df)[col_inds] old_columns = _columns(df)[selected_columns] new_columns = Vector{AbstractVector}(undef, length(old_columns)) + if length(new_columns) == 1 new_columns[1] = only(old_columns)[row_inds] elseif length(new_columns) > 1 # Computing integer indices once for all columns is faster selected_rows = T === Bool ? findall(row_inds) : row_inds - Threads.@threads for i in eachindex(new_columns) - new_columns[i] = old_columns[i][selected_rows] + @static if VERSION >= v"1.4" + if length(selected_rows) > 1_000_000 && Threads.nthreads() > 1 + @sync for i in eachindex(new_columns) + Threads.@spawn new_columns[i] = old_columns[i][selected_rows] + end + else + for i in eachindex(new_columns) + new_columns[i] = old_columns[i][selected_rows] + end + end + else + for i in eachindex(new_columns) + new_columns[i] = old_columns[i][selected_rows] + end end end return DataFrame(new_columns, Index(_names(df)[selected_columns]), copycols=false) end -@inline function Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T - @boundscheck if !checkindex(Bool, axes(df, 1), row_inds) - throw(BoundsError(df, (row_inds, :))) - end - old_columns = _columns(df) - new_columns = Vector{AbstractVector}(undef, length(old_columns)) - if length(new_columns) == 1 - new_columns[1] = only(old_columns)[row_inds] - elseif length(new_columns) > 1 - # Computing integer indices once for all columns is faster - selected_rows = T === Bool ? findall(row_inds) : row_inds - Threads.@threads for i in eachindex(new_columns) - new_columns[i] = old_columns[i][selected_rows] - end - end - - return DataFrame(new_columns, copy(index(df)), copycols=false) -end +@inline Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T = + df[row_inds, index(df)[:]] -@inline Base.getindex(df::DataFrame, row_inds::Not, - col_inds::MultiColumnIndex) = +@inline Base.getindex(df::DataFrame, row_inds::Not, col_inds::MultiColumnIndex) = df[axes(df, 1)[row_inds], col_inds] # df[:, MultiColumnIndex] => DataFrame -Base.getindex(df::DataFrame, row_ind::Colon, - col_inds::MultiColumnIndex) = +Base.getindex(df::DataFrame, row_ind::Colon, col_inds::MultiColumnIndex) = select(df, col_inds, copycols=true) # df[!, MultiColumnIndex] => DataFrame -Base.getindex(df::DataFrame, row_ind::typeof(!), - col_inds::MultiColumnIndex) = +Base.getindex(df::DataFrame, row_ind::typeof(!), col_inds::MultiColumnIndex) = select(df, col_inds, copycols=false) ############################################################################## From 3d4cdbccb40761f512a118adda8fd183422b6c7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 16:56:57 +0100 Subject: [PATCH 03/14] fix typos --- src/dataframe/dataframe.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 49bb95acba..6cc732388c 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -190,7 +190,7 @@ struct DataFrame <: AbstractDataFrame # we write into columns as we know that it is guaranteed # that it was freshly allocated in the outer constructor - @static if VERSION >= 1.4 + @static if VERSION >= v"1.4" if Threads.nthreads() > 1 && len >= 1_000_000 @sync for i in eachindex(columns) Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols) @@ -217,7 +217,7 @@ end function _preprocess_column(col, len, copycols) # check for vectors first as they are most common col isa AbstractRange && return collect(col) - col isa AbstractVector && copycols ? copy(col) : col + col isa AbstractVector && return copycols ? copy(col) : col if col isa Union{AbstractArray{<:Any, 0}, Ref} x = col[] return fill!(Tables.allocatecolumn(typeof(x), len), x) From 47263d9dad38a878de99dae8edee55d6fb421684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 17:07:10 +0100 Subject: [PATCH 04/14] add @async --- src/dataframe/dataframe.jl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 6cc732388c..fa30894ad3 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -193,7 +193,8 @@ struct DataFrame <: AbstractDataFrame @static if VERSION >= v"1.4" if Threads.nthreads() > 1 && len >= 1_000_000 @sync for i in eachindex(columns) - Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols) + @async Threads.@spawn columns[i] = _preprocess_column(columns[i], + len, copycols) end else for i in eachindex(columns) @@ -526,7 +527,7 @@ end @static if VERSION >= v"1.4" if length(selected_rows) > 1_000_000 && Threads.nthreads() > 1 @sync for i in eachindex(new_columns) - Threads.@spawn new_columns[i] = old_columns[i][selected_rows] + @async Threads.@spawn new_columns[i] = old_columns[i][selected_rows] end else for i in eachindex(new_columns) From dbad5501117335cdc335b7571716be313ed72436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 17:31:26 +0100 Subject: [PATCH 05/14] add threading tests --- test/constructors.jl | 7 +++++++ test/indexing.jl | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/test/constructors.jl b/test/constructors.jl index ca56b2b3e3..320109a4e9 100644 --- a/test/constructors.jl +++ b/test/constructors.jl @@ -365,4 +365,11 @@ end @test_throws ArgumentError DataFrame([Int, Float64], ["a", "b"], 2) end +@testset "threading correcness tests" begin + for x in (10, 2*10^6), y in 1:4 + df = DataFrame(rand(x, y), :auto) + @test df == copy(df) + end +end + end # module diff --git a/test/indexing.jl b/test/indexing.jl index ca798c3edc..34279211b1 100644 --- a/test/indexing.jl +++ b/test/indexing.jl @@ -2009,4 +2009,15 @@ if VERSION >= v"1.5" include("indexing_offset.jl") end +@testset "threading correcness tests" begin + for x in (10, 2*10^6), y in 1:4 + mat = rand(x, y) + df = DataFrame(mat, :auto) + for rowrange in [:, 1:nrow(df)-5, collect(1:nrow(df)-5), axes(df, 1) .< nrow(df)-5], + colrange in [:, axes(df, 2), collect(axes(df, 2)), 1:ncol(df) - 1] + @test DataFrame(mat[rowrange, colrange], :auto) == df[rowrange, colrange] + end + end +end + end # module From d1e134bc2f5bbf7fc9033d25faf96bd94bbd4aea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 18:13:03 +0100 Subject: [PATCH 06/14] Apply suggestions from code review Co-authored-by: Milan Bouchet-Valat --- src/dataframe/dataframe.jl | 2 +- test/constructors.jl | 2 +- test/indexing.jl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index fa30894ad3..2ba8a0fc28 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -215,7 +215,7 @@ struct DataFrame <: AbstractDataFrame end end -function _preprocess_column(col, len, copycols) +function _preprocess_column(col::AbstractVector, len::Integer, copycols::Bool) # check for vectors first as they are most common col isa AbstractRange && return collect(col) col isa AbstractVector && return copycols ? copy(col) : col diff --git a/test/constructors.jl b/test/constructors.jl index 320109a4e9..9a4b575eb5 100644 --- a/test/constructors.jl +++ b/test/constructors.jl @@ -365,7 +365,7 @@ end @test_throws ArgumentError DataFrame([Int, Float64], ["a", "b"], 2) end -@testset "threading correcness tests" begin +@testset "threading correctness tests" begin for x in (10, 2*10^6), y in 1:4 df = DataFrame(rand(x, y), :auto) @test df == copy(df) diff --git a/test/indexing.jl b/test/indexing.jl index 34279211b1..1aedf9ea29 100644 --- a/test/indexing.jl +++ b/test/indexing.jl @@ -2009,7 +2009,7 @@ if VERSION >= v"1.5" include("indexing_offset.jl") end -@testset "threading correcness tests" begin +@testset "threading correctness tests" begin for x in (10, 2*10^6), y in 1:4 mat = rand(x, y) df = DataFrame(mat, :auto) From 3dd9c2eb658ca9c6052c9933f19abfbb5560f459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 18:21:13 +0100 Subject: [PATCH 07/14] fixes after code review --- NEWS.md | 8 +++++--- src/dataframe/dataframe.jl | 21 +++++++++++---------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/NEWS.md b/NEWS.md index 7a419433cb..71d678c737 100644 --- a/NEWS.md +++ b/NEWS.md @@ -39,9 +39,11 @@ These aspects of input data frames might affect the order of rows produced in the output ([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612), [#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) -* `DataFrame` constructor, `getindex`, `select`, `select!`, `transform`, `transform!`, - and `combine` functions now use multiple threads in selected operations - ([XXXX](XXXX)) +* `DataFrame` constructor, `copy`, `getindex`, `select`, `select!`, `transform`, + `transform!`, and `combine` functions now use multiple threads in selected operations + ([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)) + ([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)) + ([#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574)) # DataFrames v0.22 Release Notes diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 2ba8a0fc28..6c262d6da8 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -191,10 +191,9 @@ struct DataFrame <: AbstractDataFrame # we write into columns as we know that it is guaranteed # that it was freshly allocated in the outer constructor @static if VERSION >= v"1.4" - if Threads.nthreads() > 1 && len >= 1_000_000 + if copycols && len >= 1_000_000 && length(columns) > 1 && Threads.nthreads() > 1 @sync for i in eachindex(columns) - @async Threads.@spawn columns[i] = _preprocess_column(columns[i], - len, copycols) + Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols) end else for i in eachindex(columns) @@ -217,17 +216,19 @@ end function _preprocess_column(col::AbstractVector, len::Integer, copycols::Bool) # check for vectors first as they are most common - col isa AbstractRange && return collect(col) - col isa AbstractVector && return copycols ? copy(col) : col - if col isa Union{AbstractArray{<:Any, 0}, Ref} + if col isa AbstractRange + return collect(col) + elseif col isa AbstractVector + return copycols ? copy(col) : col + elseif col isa Union{AbstractArray{<:Any, 0}, Ref} x = col[] return fill!(Tables.allocatecolumn(typeof(x), len), x) - end - if col isa AbstractArray + elseif col isa AbstractArray throw(ArgumentError("adding AbstractArray other than AbstractVector " * "as a column of a data frame is not allowed")) + else + return fill!(Tables.allocatecolumn(typeof(col), len), col) end - return fill!(Tables.allocatecolumn(typeof(col), len), col) end DataFrame(df::DataFrame; copycols::Bool=true) = copy(df, copycols=copycols) @@ -527,7 +528,7 @@ end @static if VERSION >= v"1.4" if length(selected_rows) > 1_000_000 && Threads.nthreads() > 1 @sync for i in eachindex(new_columns) - @async Threads.@spawn new_columns[i] = old_columns[i][selected_rows] + Threads.@spawn new_columns[i] = old_columns[i][selected_rows] end else for i in eachindex(new_columns) From 67a45b8f51f704a48e729062adef81ed6e586c83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Mon, 8 Mar 2021 19:02:30 +0100 Subject: [PATCH 08/14] Update src/dataframe/dataframe.jl --- src/dataframe/dataframe.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 6c262d6da8..4a230d50d0 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -214,7 +214,7 @@ struct DataFrame <: AbstractDataFrame end end -function _preprocess_column(col::AbstractVector, len::Integer, copycols::Bool) +function _preprocess_column(col::Any, len::Integer, copycols::Bool) # check for vectors first as they are most common if col isa AbstractRange return collect(col) From c9ef02074ac3f94bd22c39fb5709b2263e2723d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Sun, 14 Mar 2021 20:13:27 +0100 Subject: [PATCH 09/14] Update src/dataframe/dataframe.jl Co-authored-by: Milan Bouchet-Valat --- src/dataframe/dataframe.jl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 4a230d50d0..792aad0cc0 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -215,7 +215,6 @@ struct DataFrame <: AbstractDataFrame end function _preprocess_column(col::Any, len::Integer, copycols::Bool) - # check for vectors first as they are most common if col isa AbstractRange return collect(col) elseif col isa AbstractVector From 456c40cd8b1b8ada00fabede61a90b2fc5b2c0d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Tue, 16 Mar 2021 12:05:43 +0100 Subject: [PATCH 10/14] Update NEWS.md --- NEWS.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/NEWS.md b/NEWS.md index 2a923e04da..5193f2c08d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -49,8 +49,8 @@ ([#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) * `DataFrame` constructor, `copy`, `getindex`, `select`, `select!`, `transform`, `transform!`, and `combine` functions now use multiple threads in selected operations - ([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)) - ([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)) + ([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)), + ([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)), ([#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574)) # DataFrames v0.22 Release Notes From 5d15fb20ff2167a76019ffe6a28b9caeec17e583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Tue, 16 Mar 2021 17:54:46 +0100 Subject: [PATCH 11/14] add performance benchmarks --- .../constructor_and_indexing_performance.jl | 38 +++++++ benchmarks/constructor_and_indexing/run.sh | 3 + benchmarks/joins/join_performance.jl | 101 ++++++++++++++++++ benchmarks/joins/run.sh | 12 +++ benchmarks/{ => joins}/runtests.jl | 0 src/dataframe/dataframe.jl | 2 +- 6 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl create mode 100644 benchmarks/constructor_and_indexing/run.sh create mode 100644 benchmarks/joins/join_performance.jl create mode 100644 benchmarks/joins/run.sh rename benchmarks/{ => joins}/runtests.jl (100%) diff --git a/benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl b/benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl new file mode 100644 index 0000000000..920297ed60 --- /dev/null +++ b/benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl @@ -0,0 +1,38 @@ +using BenchmarkTools +using DataFrames +using PooledArrays +using Random + +@show Threads.nthreads() + +Random.seed!(1234) +ref_dfi = DataFrame(rand(1:10^4, 10^7, 4), :auto) +ref_dfs = string.(ref_dfi) +ref_dfp = mapcols(PooledArray, ref_dfs) + +res = DataFrame(rows=Int[],cols=Int[], type=String[], op=String[], time=Float64[]) + +for x in (10, 10^6-1, 10^6, 10^7), y in 1:4 + dfi = ref_dfi[1:x, 1:y] + dfs = ref_dfs[1:x, 1:y] + dfp = ref_dfp[1:x, 1:y] + + @show (x, y) # ping that the process is alive + push!(res, (x, y, "integer", "copy", @belapsed DataFrame($dfi))) + push!(res, (x, y, "string", "copy", @belapsed DataFrame($dfs))) + push!(res, (x, y, "pooled", "copy", @belapsed DataFrame($dfp))) + push!(res, (x, y, "integer", ":", @belapsed $dfi[:, :])) + push!(res, (x, y, "string", ":", @belapsed $dfs[:, :])) + push!(res, (x, y, "pooled", ":", @belapsed $dfp[:, :])) + push!(res, (x, y, "integer", "1:end-5", @belapsed $dfi[1:end-5, :])) + push!(res, (x, y, "string", "1:end-5", @belapsed $dfs[1:end-5, :])) + push!(res, (x, y, "pooled", "1:end-5", @belapsed $dfp[1:end-5, :])) + push!(res, (x, y, "integer", "1:5", @belapsed $dfi[1:5, :])) + push!(res, (x, y, "string", "1:5", @belapsed $dfs[1:1:5, :])) + push!(res, (x, y, "pooled", "1:5", @belapsed $dfp[1:1:5, :])) +end + +res.time *= 1_000 + +@show Threads.nthreads() +@show unstack(res, [:cols, :type, :op], :rows, :time) diff --git a/benchmarks/constructor_and_indexing/run.sh b/benchmarks/constructor_and_indexing/run.sh new file mode 100644 index 0000000000..afd2db25b7 --- /dev/null +++ b/benchmarks/constructor_and_indexing/run.sh @@ -0,0 +1,3 @@ +julia -t 1 constructor_and_indexing_performance.jl +julia -t 2 constructor_and_indexing_performance.jl +julia -t 4 constructor_and_indexing_performance.jl diff --git a/benchmarks/joins/join_performance.jl b/benchmarks/joins/join_performance.jl new file mode 100644 index 0000000000..af3b8a59ff --- /dev/null +++ b/benchmarks/joins/join_performance.jl @@ -0,0 +1,101 @@ +using CategoricalArrays +using DataFrames +using PooledArrays +using Random + +fullgc() = (GC.gc(true); GC.gc(true); GC.gc(true); GC.gc(true)) + +@assert length(ARGS) == 7 +@assert ARGS[3] in ["int", "pool", "cat", "str"] +@assert ARGS[4] in ["uniq", "dup", "manydup"] +@assert ARGS[5] in ["sort", "rand"] +@assert ARGS[6] in ["1", "2"] +@assert ARGS[7] in ["inner", "left", "right", "outer", "semi", "anti"] + +@info ARGS + +llen = parse(Int, ARGS[1]) +rlen = parse(Int, ARGS[2]) +@assert llen > 1000 +@assert rlen > 2000 + +pad = maximum(length.(string.((llen, rlen)))) + +if ARGS[3] == "int" + if ARGS[4] == "uniq" + col1 = [1:llen;] + col2 = [1:rlen;] + elseif ARGS[4] == "dup" + col1 = repeat(1:llen ÷ 2, inner=2) + col2 = repeat(1:rlen ÷ 2, inner=2) + else + @assert ARGS[4] == "manydup" + col1 = repeat(1:llen ÷ 20, inner=20) + col2 = repeat(1:rlen ÷ 20, inner=20) + end +elseif ARGS[3] == "pool" + if ARGS[4] == "dup" + col1 = PooledArray(repeat(string.(1:llen ÷ 2, pad=pad), inner=2)) + col2 = PooledArray(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)) + else + @assert ARGS[4] == "manydup" + col1 = PooledArray(repeat(string.(1:llen ÷ 20, pad=pad), inner=20)) + col2 = PooledArray(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)) + end +elseif ARGS[3] == "cat" + if ARGS[4] == "dup" + col1 = categorical(repeat(string.(1:llen ÷ 2, pad=pad), inner=2)) + col2 = categorical(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)) + else + @assert ARGS[4] == "manydup" + col1 = categorical(repeat(string.(1:llen ÷ 20, pad=pad), inner=20)) + col2 = categorical(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)) + end +else + @assert ARGS[3] == "str" + if ARGS[4] == "uniq" + col1 = string.(1:llen, pad=pad) + col2 = string.(1:rlen, pad=pad) + elseif ARGS[4] == "dup" + col1 = repeat(string.(1:llen ÷ 2, pad=pad), inner=2) + col2 = repeat(string.(1:rlen ÷ 2, pad=pad), inner=2) + else + @assert ARGS[4] == "manydup" + col1 = repeat(string.(1:llen ÷ 20, pad=pad), inner=20) + col2 = repeat(string.(1:rlen ÷ 20, pad=pad), inner=20) + end +end + +Random.seed!(1234) + +if ARGS[5] == "rand" + shuffle!(col1) + shuffle!(col2) +else + @assert ARGS[5] == "sort" +end + +const joinfun = Dict("inner" => innerjoin, "left" => leftjoin, + "right" => rightjoin, "outer" => outerjoin, + "semi" => semijoin, "anti" => antijoin)[ARGS[7]] + +if ARGS[6] == "1" + df1 = DataFrame(id1 = col1) + df2 = DataFrame(id1 = col2) + joinfun(df1[1:1000, :], df2[1:2000, :], on=:id1) + joinfun(df2[1:2000, :], df1[1:1000, :], on=:id1) + fullgc() + @time joinfun(df1, df2, on=:id1) + fullgc() + @time joinfun(df2, df1, on=:id1) +else + @assert ARGS[6] == "2" + df1 = DataFrame(id1 = col1, id2 = col1) + df2 = DataFrame(id1 = col2, id2 = col2) + joinfun(df1[1:1000, :], df2[1:2000, :], on=[:id1, :id2]) + joinfun(df2[1:2000, :], df1[1:1000, :], on=[:id1, :id2]) + fullgc() + @time joinfun(df1, df2, on=[:id1, :id2]) + fullgc() + @time joinfun(df2, df1, on=[:id1, :id2]) +end diff --git a/benchmarks/joins/run.sh b/benchmarks/joins/run.sh new file mode 100644 index 0000000000..0cd0c3811c --- /dev/null +++ b/benchmarks/joins/run.sh @@ -0,0 +1,12 @@ +julia runtests.jl 100000 50000000 inner +julia runtests.jl 5000000 10000000 inner +julia runtests.jl 100000 50000000 left +julia runtests.jl 5000000 10000000 left +julia runtests.jl 100000 50000000 right +julia runtests.jl 5000000 10000000 right +julia runtests.jl 100000 50000000 outer +julia runtests.jl 5000000 10000000 outer +julia runtests.jl 100000 50000000 semi +julia runtests.jl 5000000 10000000 semi +julia runtests.jl 100000 50000000 anti +julia runtests.jl 5000000 10000000 anti diff --git a/benchmarks/runtests.jl b/benchmarks/joins/runtests.jl similarity index 100% rename from benchmarks/runtests.jl rename to benchmarks/joins/runtests.jl diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 847ca93ec0..5029c1d1d1 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -525,7 +525,7 @@ end # Computing integer indices once for all columns is faster selected_rows = T === Bool ? findall(row_inds) : row_inds @static if VERSION >= v"1.4" - if length(selected_rows) > 1_000_000 && Threads.nthreads() > 1 + if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1 @sync for i in eachindex(new_columns) Threads.@spawn new_columns[i] = old_columns[i][selected_rows] end From b11638df683f21e9a2e328f19728e2994e5b418e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Tue, 16 Mar 2021 20:23:19 +0100 Subject: [PATCH 12/14] improve performance of getindex and construction for small data frames --- src/dataframe/dataframe.jl | 59 ++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 5029c1d1d1..eb7facdf8d 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -516,36 +516,57 @@ end throw(BoundsError(df, (row_inds, col_inds))) end selected_columns = index(df)[col_inds] - old_columns = _columns(df)[selected_columns] - new_columns = Vector{AbstractVector}(undef, length(old_columns)) - if length(new_columns) == 1 - new_columns[1] = only(old_columns)[row_inds] - elseif length(new_columns) > 1 + u = _names(df)[selected_columns] + lookup = Dict{Symbol, Int}(zip(u, 1:length(u))) + idx = Index(lookup, u) + + if length(selected_columns) == 1 + return DataFrame(AbstractVector[_columns(df)[selected_columns[1]][row_inds]], idx, copycols=false) + else # Computing integer indices once for all columns is faster selected_rows = T === Bool ? findall(row_inds) : row_inds @static if VERSION >= v"1.4" if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1 + new_columns = Vector{AbstractVector}(undef, length(selected_columns)) @sync for i in eachindex(new_columns) - Threads.@spawn new_columns[i] = old_columns[i][selected_rows] + Threads.@spawn new_columns[i] = _columns(df)[selected_columns[i]][selected_rows] end + return DataFrame(new_columns, idx, copycols=false) else - for i in eachindex(new_columns) - new_columns[i] = old_columns[i][selected_rows] - end + return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], idx, copycols=false) end else - for i in eachindex(new_columns) - new_columns[i] = old_columns[i][selected_rows] - end + return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], idx, copycols=false) end end - - return DataFrame(new_columns, Index(_names(df)[selected_columns]), copycols=false) end -@inline Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T = - df[row_inds, index(df)[:]] +@inline function Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T + @boundscheck if !checkindex(Bool, axes(df, 1), row_inds) + throw(BoundsError(df, (row_inds, :))) + end + + if ncol(df) == 1 + return DataFrame(AbstractVector[_columns(df)[1][row_inds]], copy(index(df)), copycols=false) + else + # Computing integer indices once for all columns is faster + selected_rows = T === Bool ? findall(row_inds) : row_inds + @static if VERSION >= v"1.4" + if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1 + new_columns = Vector{AbstractVector}(undef, ncol(df)) + @sync for i in eachindex(new_columns) + Threads.@spawn new_columns[i] = _columns(df)[i][selected_rows] + end + return DataFrame(new_columns, copy(index(df)), copycols=false) + else + return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], copy(index(df)), copycols=false) + end + else + return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], copy(index(df)), copycols=false) + end + end +end @inline Base.getindex(df::DataFrame, row_inds::Not, col_inds::MultiColumnIndex) = df[axes(df, 1)[row_inds], col_inds] @@ -901,11 +922,7 @@ copies of column vectors in `df`. If `copycols=false`, return a new `DataFrame` sharing column vectors with `df`. """ function Base.copy(df::DataFrame; copycols::Bool=true) - if copycols - df[:, :] - else - DataFrame(_columns(df), _names(df), copycols=false) - end + return DataFrame(copy(_columns(df)), copy(index(df)), copycols=copycols) end """ From 62e0a51c78c03bb1c6aa1718f1d68cd8f55da25e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Wed, 17 Mar 2021 07:58:46 +0100 Subject: [PATCH 13/14] minor code cleanup --- src/dataframe/dataframe.jl | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index eb7facdf8d..aab2521ee8 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -522,7 +522,8 @@ end idx = Index(lookup, u) if length(selected_columns) == 1 - return DataFrame(AbstractVector[_columns(df)[selected_columns[1]][row_inds]], idx, copycols=false) + return DataFrame(AbstractVector[_columns(df)[selected_columns[1]][row_inds]], + idx, copycols=false) else # Computing integer indices once for all columns is faster selected_rows = T === Bool ? findall(row_inds) : row_inds @@ -534,10 +535,12 @@ end end return DataFrame(new_columns, idx, copycols=false) else - return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], idx, copycols=false) + return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], + idx, copycols=false) end else - return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], idx, copycols=false) + return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], + idx, copycols=false) end end end @@ -547,8 +550,10 @@ end throw(BoundsError(df, (row_inds, :))) end + idx = copy(index(df)) + if ncol(df) == 1 - return DataFrame(AbstractVector[_columns(df)[1][row_inds]], copy(index(df)), copycols=false) + return DataFrame(AbstractVector[_columns(df)[1][row_inds]], idx, copycols=false) else # Computing integer indices once for all columns is faster selected_rows = T === Bool ? findall(row_inds) : row_inds @@ -558,12 +563,14 @@ end @sync for i in eachindex(new_columns) Threads.@spawn new_columns[i] = _columns(df)[i][selected_rows] end - return DataFrame(new_columns, copy(index(df)), copycols=false) + return DataFrame(new_columns, idx, copycols=false) else - return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], copy(index(df)), copycols=false) + return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], + idx, copycols=false) end else - return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], copy(index(df)), copycols=false) + return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], + idx, copycols=false) end end end From 5646db904dbe09df92182c30741a0b103e96b2bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogumi=C5=82=20Kami=C5=84ski?= Date: Wed, 17 Mar 2021 18:46:58 +0100 Subject: [PATCH 14/14] final cleanup --- src/dataframe/dataframe.jl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index aab2521ee8..2416b866de 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -519,6 +519,8 @@ end u = _names(df)[selected_columns] lookup = Dict{Symbol, Int}(zip(u, 1:length(u))) + # use this constructor to avoid checking twice if column names are not + # duplicate as index(df)[col_inds] already checks this idx = Index(lookup, u) if length(selected_columns) == 1 @@ -535,11 +537,11 @@ end end return DataFrame(new_columns, idx, copycols=false) else - return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], + return DataFrame(AbstractVector[_columns(df)[i][selected_rows] for i in selected_columns], idx, copycols=false) end else - return DataFrame(AbstractVector[dv[selected_rows] for dv in view(_columns(df), selected_columns)], + return DataFrame(AbstractVector[_columns(df)[i][selected_rows] for i in selected_columns], idx, copycols=false) end end @@ -549,7 +551,6 @@ end @boundscheck if !checkindex(Bool, axes(df, 1), row_inds) throw(BoundsError(df, (row_inds, :))) end - idx = copy(index(df)) if ncol(df) == 1