diff --git a/NEWS.md b/NEWS.md index de4a36b8a0..5193f2c08d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -46,7 +46,12 @@ values in `on` columns. These aspects of input data frames might affect the order of rows produced in the output ([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612), - [#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) + ([#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) +* `DataFrame` constructor, `copy`, `getindex`, `select`, `select!`, `transform`, + `transform!`, and `combine` functions now use multiple threads in selected operations + ([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)), + ([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)), + ([#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574)) # DataFrames v0.22 Release Notes diff --git a/benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl b/benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl new file mode 100644 index 0000000000..920297ed60 --- /dev/null +++ b/benchmarks/constructor_and_indexing/constructor_and_indexing_performance.jl @@ -0,0 +1,38 @@ +using BenchmarkTools +using DataFrames +using PooledArrays +using Random + +@show Threads.nthreads() + +Random.seed!(1234) +ref_dfi = DataFrame(rand(1:10^4, 10^7, 4), :auto) +ref_dfs = string.(ref_dfi) +ref_dfp = mapcols(PooledArray, ref_dfs) + +res = DataFrame(rows=Int[],cols=Int[], type=String[], op=String[], time=Float64[]) + +for x in (10, 10^6-1, 10^6, 10^7), y in 1:4 + dfi = ref_dfi[1:x, 1:y] + dfs = ref_dfs[1:x, 1:y] + dfp = ref_dfp[1:x, 1:y] + + @show (x, y) # ping that the process is alive + push!(res, (x, y, "integer", "copy", @belapsed DataFrame($dfi))) + push!(res, (x, y, "string", "copy", @belapsed DataFrame($dfs))) + push!(res, (x, y, "pooled", "copy", @belapsed DataFrame($dfp))) + push!(res, (x, y, "integer", ":", @belapsed $dfi[:, :])) + push!(res, (x, y, "string", ":", @belapsed $dfs[:, :])) + push!(res, (x, y, "pooled", ":", @belapsed $dfp[:, :])) + push!(res, (x, y, "integer", "1:end-5", @belapsed $dfi[1:end-5, :])) + push!(res, (x, y, "string", "1:end-5", @belapsed $dfs[1:end-5, :])) + push!(res, (x, y, "pooled", "1:end-5", @belapsed $dfp[1:end-5, :])) + push!(res, (x, y, "integer", "1:5", @belapsed $dfi[1:5, :])) + push!(res, (x, y, "string", "1:5", @belapsed $dfs[1:1:5, :])) + push!(res, (x, y, "pooled", "1:5", @belapsed $dfp[1:1:5, :])) +end + +res.time *= 1_000 + +@show Threads.nthreads() +@show unstack(res, [:cols, :type, :op], :rows, :time) diff --git a/benchmarks/constructor_and_indexing/run.sh b/benchmarks/constructor_and_indexing/run.sh new file mode 100644 index 0000000000..afd2db25b7 --- /dev/null +++ b/benchmarks/constructor_and_indexing/run.sh @@ -0,0 +1,3 @@ +julia -t 1 constructor_and_indexing_performance.jl +julia -t 2 constructor_and_indexing_performance.jl +julia -t 4 constructor_and_indexing_performance.jl diff --git a/benchmarks/joins/join_performance.jl b/benchmarks/joins/join_performance.jl new file mode 100644 index 0000000000..af3b8a59ff --- /dev/null +++ b/benchmarks/joins/join_performance.jl @@ -0,0 +1,101 @@ +using CategoricalArrays +using DataFrames +using PooledArrays +using Random + +fullgc() = (GC.gc(true); GC.gc(true); GC.gc(true); GC.gc(true)) + +@assert length(ARGS) == 7 +@assert ARGS[3] in ["int", "pool", "cat", "str"] +@assert ARGS[4] in ["uniq", "dup", "manydup"] +@assert ARGS[5] in ["sort", "rand"] +@assert ARGS[6] in ["1", "2"] +@assert ARGS[7] in ["inner", "left", "right", "outer", "semi", "anti"] + +@info ARGS + +llen = parse(Int, ARGS[1]) +rlen = parse(Int, ARGS[2]) +@assert llen > 1000 +@assert rlen > 2000 + +pad = maximum(length.(string.((llen, rlen)))) + +if ARGS[3] == "int" + if ARGS[4] == "uniq" + col1 = [1:llen;] + col2 = [1:rlen;] + elseif ARGS[4] == "dup" + col1 = repeat(1:llen ÷ 2, inner=2) + col2 = repeat(1:rlen ÷ 2, inner=2) + else + @assert ARGS[4] == "manydup" + col1 = repeat(1:llen ÷ 20, inner=20) + col2 = repeat(1:rlen ÷ 20, inner=20) + end +elseif ARGS[3] == "pool" + if ARGS[4] == "dup" + col1 = PooledArray(repeat(string.(1:llen ÷ 2, pad=pad), inner=2)) + col2 = PooledArray(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)) + else + @assert ARGS[4] == "manydup" + col1 = PooledArray(repeat(string.(1:llen ÷ 20, pad=pad), inner=20)) + col2 = PooledArray(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)) + end +elseif ARGS[3] == "cat" + if ARGS[4] == "dup" + col1 = categorical(repeat(string.(1:llen ÷ 2, pad=pad), inner=2)) + col2 = categorical(repeat(string.(1:rlen ÷ 2, pad=pad), inner=2)) + else + @assert ARGS[4] == "manydup" + col1 = categorical(repeat(string.(1:llen ÷ 20, pad=pad), inner=20)) + col2 = categorical(repeat(string.(1:rlen ÷ 20, pad=pad), inner=20)) + end +else + @assert ARGS[3] == "str" + if ARGS[4] == "uniq" + col1 = string.(1:llen, pad=pad) + col2 = string.(1:rlen, pad=pad) + elseif ARGS[4] == "dup" + col1 = repeat(string.(1:llen ÷ 2, pad=pad), inner=2) + col2 = repeat(string.(1:rlen ÷ 2, pad=pad), inner=2) + else + @assert ARGS[4] == "manydup" + col1 = repeat(string.(1:llen ÷ 20, pad=pad), inner=20) + col2 = repeat(string.(1:rlen ÷ 20, pad=pad), inner=20) + end +end + +Random.seed!(1234) + +if ARGS[5] == "rand" + shuffle!(col1) + shuffle!(col2) +else + @assert ARGS[5] == "sort" +end + +const joinfun = Dict("inner" => innerjoin, "left" => leftjoin, + "right" => rightjoin, "outer" => outerjoin, + "semi" => semijoin, "anti" => antijoin)[ARGS[7]] + +if ARGS[6] == "1" + df1 = DataFrame(id1 = col1) + df2 = DataFrame(id1 = col2) + joinfun(df1[1:1000, :], df2[1:2000, :], on=:id1) + joinfun(df2[1:2000, :], df1[1:1000, :], on=:id1) + fullgc() + @time joinfun(df1, df2, on=:id1) + fullgc() + @time joinfun(df2, df1, on=:id1) +else + @assert ARGS[6] == "2" + df1 = DataFrame(id1 = col1, id2 = col1) + df2 = DataFrame(id1 = col2, id2 = col2) + joinfun(df1[1:1000, :], df2[1:2000, :], on=[:id1, :id2]) + joinfun(df2[1:2000, :], df1[1:1000, :], on=[:id1, :id2]) + fullgc() + @time joinfun(df1, df2, on=[:id1, :id2]) + fullgc() + @time joinfun(df2, df1, on=[:id1, :id2]) +end diff --git a/benchmarks/joins/run.sh b/benchmarks/joins/run.sh new file mode 100644 index 0000000000..0cd0c3811c --- /dev/null +++ b/benchmarks/joins/run.sh @@ -0,0 +1,12 @@ +julia runtests.jl 100000 50000000 inner +julia runtests.jl 5000000 10000000 inner +julia runtests.jl 100000 50000000 left +julia runtests.jl 5000000 10000000 left +julia runtests.jl 100000 50000000 right +julia runtests.jl 5000000 10000000 right +julia runtests.jl 100000 50000000 outer +julia runtests.jl 5000000 10000000 outer +julia runtests.jl 100000 50000000 semi +julia runtests.jl 5000000 10000000 semi +julia runtests.jl 100000 50000000 anti +julia runtests.jl 5000000 10000000 anti diff --git a/benchmarks/runtests.jl b/benchmarks/joins/runtests.jl similarity index 100% rename from benchmarks/runtests.jl rename to benchmarks/joins/runtests.jl diff --git a/src/dataframe/dataframe.jl b/src/dataframe/dataframe.jl index 023494d9ba..2416b866de 100644 --- a/src/dataframe/dataframe.jl +++ b/src/dataframe/dataframe.jl @@ -190,21 +190,19 @@ struct DataFrame <: AbstractDataFrame # we write into columns as we know that it is guaranteed # that it was freshly allocated in the outer constructor - for (i, col) in enumerate(columns) - # check for vectors first as they are most common - if col isa AbstractRange - columns[i] = collect(col) - elseif col isa AbstractVector - columns[i] = copycols ? copy(col) : col - elseif col isa Union{AbstractArray{<:Any, 0}, Ref} - x = col[] - columns[i] = fill!(Tables.allocatecolumn(typeof(x), len), x) + @static if VERSION >= v"1.4" + if copycols && len >= 1_000_000 && length(columns) > 1 && Threads.nthreads() > 1 + @sync for i in eachindex(columns) + Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols) + end else - if col isa AbstractArray - throw(ArgumentError("adding AbstractArray other than AbstractVector " * - "as a column of a data frame is not allowed")) + for i in eachindex(columns) + columns[i] = _preprocess_column(columns[i], len, copycols) end - columns[i] = fill!(Tables.allocatecolumn(typeof(col), len), col) + end + else + for i in eachindex(columns) + columns[i] = _preprocess_column(columns[i], len, copycols) end end @@ -216,6 +214,22 @@ struct DataFrame <: AbstractDataFrame end end +function _preprocess_column(col::Any, len::Integer, copycols::Bool) + if col isa AbstractRange + return collect(col) + elseif col isa AbstractVector + return copycols ? copy(col) : col + elseif col isa Union{AbstractArray{<:Any, 0}, Ref} + x = col[] + return fill!(Tables.allocatecolumn(typeof(x), len), x) + elseif col isa AbstractArray + throw(ArgumentError("adding AbstractArray other than AbstractVector " * + "as a column of a data frame is not allowed")) + else + return fill!(Tables.allocatecolumn(typeof(col), len), col) + end +end + DataFrame(df::DataFrame; copycols::Bool=true) = copy(df, copycols=copycols) function DataFrame(pairs::Pair{Symbol, <:Any}...; makeunique::Bool=false, @@ -502,34 +516,75 @@ end throw(BoundsError(df, (row_inds, col_inds))) end selected_columns = index(df)[col_inds] - # Computing integer indices once for all columns is faster - selected_rows = T === Bool ? findall(row_inds) : row_inds - new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)[selected_columns]] - return DataFrame(new_columns, Index(_names(df)[selected_columns]), copycols=false) + + u = _names(df)[selected_columns] + lookup = Dict{Symbol, Int}(zip(u, 1:length(u))) + # use this constructor to avoid checking twice if column names are not + # duplicate as index(df)[col_inds] already checks this + idx = Index(lookup, u) + + if length(selected_columns) == 1 + return DataFrame(AbstractVector[_columns(df)[selected_columns[1]][row_inds]], + idx, copycols=false) + else + # Computing integer indices once for all columns is faster + selected_rows = T === Bool ? findall(row_inds) : row_inds + @static if VERSION >= v"1.4" + if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1 + new_columns = Vector{AbstractVector}(undef, length(selected_columns)) + @sync for i in eachindex(new_columns) + Threads.@spawn new_columns[i] = _columns(df)[selected_columns[i]][selected_rows] + end + return DataFrame(new_columns, idx, copycols=false) + else + return DataFrame(AbstractVector[_columns(df)[i][selected_rows] for i in selected_columns], + idx, copycols=false) + end + else + return DataFrame(AbstractVector[_columns(df)[i][selected_rows] for i in selected_columns], + idx, copycols=false) + end + end end @inline function Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T @boundscheck if !checkindex(Bool, axes(df, 1), row_inds) throw(BoundsError(df, (row_inds, :))) end - # Computing integer indices once for all columns is faster - selected_rows = T === Bool ? findall(row_inds) : row_inds - new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)] - return DataFrame(new_columns, copy(index(df)), copycols=false) + idx = copy(index(df)) + + if ncol(df) == 1 + return DataFrame(AbstractVector[_columns(df)[1][row_inds]], idx, copycols=false) + else + # Computing integer indices once for all columns is faster + selected_rows = T === Bool ? findall(row_inds) : row_inds + @static if VERSION >= v"1.4" + if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1 + new_columns = Vector{AbstractVector}(undef, ncol(df)) + @sync for i in eachindex(new_columns) + Threads.@spawn new_columns[i] = _columns(df)[i][selected_rows] + end + return DataFrame(new_columns, idx, copycols=false) + else + return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], + idx, copycols=false) + end + else + return DataFrame(AbstractVector[dv[selected_rows] for dv in _columns(df)], + idx, copycols=false) + end + end end -@inline Base.getindex(df::DataFrame, row_inds::Not, - col_inds::MultiColumnIndex) = +@inline Base.getindex(df::DataFrame, row_inds::Not, col_inds::MultiColumnIndex) = df[axes(df, 1)[row_inds], col_inds] # df[:, MultiColumnIndex] => DataFrame -Base.getindex(df::DataFrame, row_ind::Colon, - col_inds::MultiColumnIndex) = +Base.getindex(df::DataFrame, row_ind::Colon, col_inds::MultiColumnIndex) = select(df, col_inds, copycols=true) # df[!, MultiColumnIndex] => DataFrame -Base.getindex(df::DataFrame, row_ind::typeof(!), - col_inds::MultiColumnIndex) = +Base.getindex(df::DataFrame, row_ind::typeof(!), col_inds::MultiColumnIndex) = select(df, col_inds, copycols=false) ############################################################################## @@ -875,11 +930,7 @@ copies of column vectors in `df`. If `copycols=false`, return a new `DataFrame` sharing column vectors with `df`. """ function Base.copy(df::DataFrame; copycols::Bool=true) - if copycols - df[:, :] - else - DataFrame(_columns(df), _names(df), copycols=false) - end + return DataFrame(copy(_columns(df)), copy(index(df)), copycols=copycols) end """ diff --git a/test/constructors.jl b/test/constructors.jl index ca56b2b3e3..9a4b575eb5 100644 --- a/test/constructors.jl +++ b/test/constructors.jl @@ -365,4 +365,11 @@ end @test_throws ArgumentError DataFrame([Int, Float64], ["a", "b"], 2) end +@testset "threading correctness tests" begin + for x in (10, 2*10^6), y in 1:4 + df = DataFrame(rand(x, y), :auto) + @test df == copy(df) + end +end + end # module diff --git a/test/indexing.jl b/test/indexing.jl index ca798c3edc..1aedf9ea29 100644 --- a/test/indexing.jl +++ b/test/indexing.jl @@ -2009,4 +2009,15 @@ if VERSION >= v"1.5" include("indexing_offset.jl") end +@testset "threading correctness tests" begin + for x in (10, 2*10^6), y in 1:4 + mat = rand(x, y) + df = DataFrame(mat, :auto) + for rowrange in [:, 1:nrow(df)-5, collect(1:nrow(df)-5), axes(df, 1) .< nrow(df)-5], + colrange in [:, axes(df, 2), collect(axes(df, 2)), 1:ncol(df) - 1] + @test DataFrame(mat[rowrange, colrange], :auto) == df[rowrange, colrange] + end + end +end + end # module