diff --git a/NEWS.md b/NEWS.md index ca22b338d4..d2bf472b53 100644 --- a/NEWS.md +++ b/NEWS.md @@ -39,6 +39,9 @@ * Add `resize!`, `keepat!`, `pop!`, `popfirst!`, and `popat!`, make `deleteat!` signature more precise ([#3047](https://github.com/JuliaData/DataFrames.jl/pull/3047)) +* New `threads` argument allows disabling multithreading in + `combine`, `select`, `select!`, `transform`, `transform!`, `subset` and `subset!` + ([#3030](https://github.com/JuliaData/DataFrames.jl/pull/3030)) ## Previously announced breaking changes diff --git a/docs/src/lib/functions.md b/docs/src/lib/functions.md index a8af39a8f3..27e684e681 100644 --- a/docs/src/lib/functions.md +++ b/docs/src/lib/functions.md @@ -4,10 +4,13 @@ CurrentModule = DataFrames # Functions -## Multi-threading support +## Multithreading support -Selected operations in DataFrames.jl automatically use multiple threads when available. -It is task-based and implemented using the `@spawn` macro from Julia Base. +By default, selected operations in DataFrames.jl automatically use multiple threads +when available. It is task-based and implemented using the `@spawn` macro from Julia Base. +Functions that take user-defined functions and may run it in parallel +accept a `threads` keyword argument which allows disabling multithreading +when the provided function requires serial execution or is not thread-safe. This is a list of operations that currently make use of multi-threading: - `DataFrame` constructor with `copycols=true`; also recursively all functions diff --git a/docs/src/lib/internals.md b/docs/src/lib/internals.md index c87868c9e2..16b91b7b81 100644 --- a/docs/src/lib/internals.md +++ b/docs/src/lib/internals.md @@ -16,6 +16,8 @@ getmaxwidths ourshow ourstrwidth @spawn_for_chunks +@spawn_or_run_task +@spawn_or_run default_table_transformation isreadonly ``` diff --git a/src/abstractdataframe/reshape.jl b/src/abstractdataframe/reshape.jl index 95ff68e4c9..1b45f3ca81 100644 --- a/src/abstractdataframe/reshape.jl +++ b/src/abstractdataframe/reshape.jl @@ -200,15 +200,15 @@ end unstack(df::AbstractDataFrame, rowkeys, colkey, value; renamecols::Function=identity, allowmissing::Bool=false, allowduplicates::Bool=false, valuestransform=nothing, - fill=missing) + fill=missing, threads::Bool=true) unstack(df::AbstractDataFrame, colkey, value; renamecols::Function=identity, allowmissing::Bool=false, allowduplicates::Bool=false, valuestransform=nothing, - fill=missing) + fill=missing, threads::Bool=true) unstack(df::AbstractDataFrame; renamecols::Function=identity, allowmissing::Bool=false, allowduplicates::Bool=false, valuestransform=nothing, - fill=missing) + fill=missing, threads::Bool=true) Unstack data frame `df`, i.e. convert it from long to wide format. @@ -244,6 +244,10 @@ Row and column keys will be ordered in the order of their first appearance. default is `missing`. If the `value` column is a `CategoricalVector` and `fill` is not `missing` then in order to keep unstacked value columns also `CategoricalVector` the `fill` must be passed as `CategoricalValue` + - `threads`: whether `valuestransform` may be run in separate tasks which + can execute in parallel (possibly being applied to multiple groups at the same time). + Whether or not tasks are actually spawned and their number are determined automatically. + Set to `false` if `valuestransform` requires serial execution or is not thread-safe. # Examples @@ -396,7 +400,8 @@ julia> unstack(df, :cols, :values, valuestransform=sum) function unstack(df::AbstractDataFrame, rowkeys, colkey::ColumnIndex, values::ColumnIndex; renamecols::Function=identity, allowmissing::Bool=false, allowduplicates::Bool=false, - valuestransform=nothing, fill=missing) + valuestransform=nothing, fill=missing, + threads::Bool=true) # first make sure that rowkeys are unique and # normalize all selectors as a strings # if some of the selectors are wrong we will get an early error here @@ -428,7 +433,8 @@ function unstack(df::AbstractDataFrame, rowkeys, colkey::ColumnIndex, # Ref that will get unwrapped by combine agg_fun = Ref∘valuestransform end - df_op = combine(gdf, values => agg_fun => values_out) + df_op = combine(gdf, values => agg_fun => values_out, + threads=threads) group_rows = find_group_row(gdf) if !issorted(group_rows) @@ -452,19 +458,23 @@ end function unstack(df::AbstractDataFrame, colkey::ColumnIndex, values::ColumnIndex; renamecols::Function=identity, allowmissing::Bool=false, allowduplicates::Bool=false, - valuestransform=nothing, fill=missing) + valuestransform=nothing, fill=missing, + threads::Bool=true) colkey_int = index(df)[colkey] value_int = index(df)[values] return unstack(df, Not(colkey_int, value_int), colkey_int, value_int, renamecols=renamecols, allowmissing=allowmissing, - allowduplicates=allowduplicates, valuestransform=valuestransform, fill=fill) + allowduplicates=allowduplicates, valuestransform=valuestransform, + fill=fill, threads=threads) end unstack(df::AbstractDataFrame; renamecols::Function=identity, allowmissing::Bool=false, allowduplicates::Bool=false, - valuestransform=nothing, fill=missing) = + valuestransform=nothing, fill=missing, + threads::Bool=true) = unstack(df, :variable, :value, renamecols=renamecols, allowmissing=allowmissing, - allowduplicates=allowduplicates, valuestransform=valuestransform, fill=fill) + allowduplicates=allowduplicates, valuestransform=valuestransform, + fill=fill, threads=threads) # we take into account the fact that idx, starts and ends are computed lazily # so we rather directly reference the gdf.groups diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 7db9d14634..6bbc126f59 100755 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -875,10 +875,14 @@ function select_transform!((nc,)::Ref{Any}, df::AbstractDataFrame, newdf::DataFr end """ - select!(df::AbstractDataFrame, args...; renamecols::Bool=true) - select!(args::Base.Callable, df::DataFrame; renamecols::Bool=true) - select!(gd::GroupedDataFrame, args...; ungroup::Bool=true, renamecols::Bool=true) - select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) + select!(df::AbstractDataFrame, args...; + renamecols::Bool=true, threads::Bool=true) + select!(args::Base.Callable, df::DataFrame; + renamecols::Bool=true, threads::Bool=true) + select!(gd::GroupedDataFrame, args...; ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) + select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) Mutate `df` or `gd` in place to retain only columns or transformations specified by `args...` and return it. The result is guaranteed to have the same number of rows as `df` or @@ -906,27 +910,40 @@ $TRANSFORMATION_COMMON_RULES column names should include the name of transformation functions or not. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `threads::Bool=true` : whether transformations may be run in separate tasks which + can execute in parallel (possibly being applied to multiple rows or groups at the same time). + Whether or not tasks are actually spawned and their number are determined automatically. + Set to `false` if some transformations require serial execution or are not thread-safe. See [`select`](@ref) for examples. """ -select!(df::DataFrame, @nospecialize(args...); renamecols::Bool=true) = - _replace_columns!(df, select(df, args..., copycols=false, renamecols=renamecols)) - -select!(df::SubDataFrame, @nospecialize(args...); renamecols::Bool=true) = - _replace_columns!(df, select(df, args..., copycols=true, renamecols=renamecols)) - -function select!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true) +select!(df::DataFrame, @nospecialize(args...); + renamecols::Bool=true, threads::Bool=true) = + _replace_columns!(df, select(df, args..., copycols=false, + renamecols=renamecols, threads=threads)) + +select!(df::SubDataFrame, @nospecialize(args...); + renamecols::Bool=true, threads::Bool=true) = + _replace_columns!(df, select(df, args..., copycols=true, + renamecols=renamecols, threads=threads)) + +function select!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) if arg isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a data frame")) end - return select!(df, arg) + return select!(df, arg, threads=threads) end """ - transform!(df::AbstractDataFrame, args...; renamecols::Bool=true) - transform!(args::Callable, df::AbstractDataFrame; renamecols::Bool=true) - transform!(gd::GroupedDataFrame, args...; ungroup::Bool=true, renamecols::Bool=true) - transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) + transform!(df::AbstractDataFrame, args...; + renamecols::Bool=true, threads::Bool=true) + transform!(args::Callable, df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) + transform!(gd::GroupedDataFrame, args...; + ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true) + transform!(f::Base.Callable, gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true) Mutate `df` or `gd` in place to add columns specified by `args...` and return it. The result is guaranteed to have the same number of rows as `df`. @@ -940,26 +957,36 @@ $TRANSFORMATION_COMMON_RULES column names should include the name of transformation functions or not. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `threads::Bool=true` : whether transformations may be run in separate tasks which + can execute in parallel (possibly being applied to multiple rows or groups at the same time). + Whether or not tasks are actually spawned and their number are determined automatically. + Set to `false` if some transformations require serial execution or are not thread-safe. See [`select`](@ref) for examples. """ -transform!(df::AbstractDataFrame, @nospecialize(args...); renamecols::Bool=true) = - select!(df, :, args..., renamecols=renamecols) +transform!(df::AbstractDataFrame, @nospecialize(args...); + renamecols::Bool=true, threads::Bool=true) = + select!(df, :, args..., renamecols=renamecols, threads=threads) -function transform!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true) +function transform!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) if arg isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a data frame")) end - return transform!(df, arg) + return transform!(df, arg, threads=threads) end """ - select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) - select(args::Callable, df::DataFrame; renamecols::Bool=true) - select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true, - ungroup::Bool=true, renamecols::Bool=true) - select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + select(df::AbstractDataFrame, args...; + copycols::Bool=true, renamecols::Bool=true, threads::Bool=true) + select(args::Callable, df::DataFrame; + renamecols::Bool=true, threads::Bool=true) + select(gd::GroupedDataFrame, args...; + copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) + select(f::Base.Callable, gd::GroupedDataFrame; + copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result is guaranteed to have the same number of rows @@ -977,6 +1004,11 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `threads::Bool=true` : whether transformations may be run in separate tasks which + can execute in parallel (possibly being applied to multiple rows or groups at the same time). + Whether or not tasks are actually spawned and their number are determined automatically. + Set to `false` if some transformations require serial execution or are not thread-safe. + # Examples ```jldoctest @@ -1230,24 +1262,30 @@ julia> select(gd, nrow, proprow, groupindices, eachindex) 8 │ 2 3 0.375 2 3 ``` """ -select(df::AbstractDataFrame, @nospecialize(args...); copycols::Bool=true, renamecols::Bool=true) = +select(df::AbstractDataFrame, @nospecialize(args...); + copycols::Bool=true, renamecols::Bool=true, threads::Bool=true) = manipulate(df, map(x -> broadcast_pair(df, x), args)..., copycols=copycols, keeprows=true, renamecols=renamecols) -function select(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true) +function select(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) if arg isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a data frame")) end - return select(df, arg) + return select(df, arg, threads=threads) end """ - transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true) - transform(f::Callable, df::DataFrame; renamecols::Bool=true) - transform(gd::GroupedDataFrame, args...; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) - transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + transform(df::AbstractDataFrame, args...; + copycols::Bool=true, renamecols::Bool=true, threads::Bool=true) + transform(f::Callable, df::DataFrame; + renamecols::Bool=true, threads::Bool=true) + transform(gd::GroupedDataFrame, args...; + copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) + transform(f::Base.Callable, gd::GroupedDataFrame; + copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) Create a new data frame that contains columns from `df` or `gd` plus columns specified by `args` and return it. The result is guaranteed to have the same @@ -1264,6 +1302,11 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `threads::Bool=true` : whether transformations may be run in separate tasks which + can execute in parallel (possibly being applied to multiple rows or groups at the same time). + Whether or not tasks are actually spawned and their number are determined automatically. + Set to `false` if some transformations require serial execution or are not thread-safe. + Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false` is needed to be able to return a different value for the grouping column: @@ -1297,23 +1340,30 @@ ERROR: ArgumentError: column :x in returned data frame is not equal to grouping See [`select`](@ref) for more examples. """ -transform(df::AbstractDataFrame, @nospecialize(args...); copycols::Bool=true, renamecols::Bool=true) = - select(df, :, args..., copycols=copycols, renamecols=renamecols) +transform(df::AbstractDataFrame, @nospecialize(args...); + copycols::Bool=true, renamecols::Bool=true, threads::Bool=true) = + select(df, :, args..., copycols=copycols, + renamecols=renamecols, threads=threads) -function transform(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true) +function transform(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) if arg isa Colon throw(ArgumentError("First argument to must be a transformation if the second argument is a data frame")) end - return transform(df, arg) + return transform(df, arg, threads=threads) end """ - combine(df::AbstractDataFrame, args...; renamecols::Bool=true) - combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true) + combine(df::AbstractDataFrame, args...; + renamecols::Bool=true, threads::Bool=true) + combine(f::Callable, df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) combine(gd::GroupedDataFrame, args...; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) combine(f::Base.Callable, gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, threads::Bool=true) Create a new data frame that contains columns from `df` or `gd` specified by `args` and return it. The result can have any number of rows that is determined @@ -1328,6 +1378,11 @@ $TRANSFORMATION_COMMON_RULES data frame. - `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data frame or a `GroupedDataFrame`. +- `threads::Bool=true` : whether transformations may be run in separate tasks which + can execute in parallel (possibly being applied to multiple rows or groups at the same time). + Whether or not tasks are actually spawned and their number are determined automatically. + Set to `false` if some transformations require serial execution or are not thread-safe. + # Examples ```jldoctest @@ -1575,18 +1630,21 @@ julia> combine(gd, :, AsTable(Not(:a)) => sum, renamecols=false) 8 │ 4 1 8 9 ``` """ -combine(df::AbstractDataFrame, @nospecialize(args...); renamecols::Bool=true) = +combine(df::AbstractDataFrame, @nospecialize(args...); + renamecols::Bool=true, threads::Bool=true) = manipulate(df, map(x -> broadcast_pair(df, x), args)..., copycols=true, keeprows=false, renamecols=renamecols) -function combine(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true) +function combine(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) if arg isa Colon throw(ArgumentError("First argument to select! must be a transformation if the second argument is a data frame")) end - return combine(df, arg) + return combine(df, arg, threads=threads) end -combine(@nospecialize(f::Pair), gd::AbstractDataFrame; renamecols::Bool=true) = +combine(@nospecialize(f::Pair), gd::AbstractDataFrame; + renamecols::Bool=true, threads::Bool=true) = throw(ArgumentError("First argument must be a transformation if the second argument is a data frame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) @@ -1601,7 +1659,7 @@ function manipulate(df::DataFrame, @nospecialize(cs...); copycols::Bool, keeprow end end return _manipulate(df, Any[normalize_selection(index(df), make_pair_concrete(c), renamecols) for c in cs_vec], - copycols, keeprows) + copycols, keeprows) end function _manipulate(df::AbstractDataFrame, normalized_cs::Vector{Any}, copycols::Bool, keeprows::Bool) diff --git a/src/abstractdataframe/subset.jl b/src/abstractdataframe/subset.jl index 83d1dd9fa7..fef62ba50e 100644 --- a/src/abstractdataframe/subset.jl +++ b/src/abstractdataframe/subset.jl @@ -75,7 +75,7 @@ function assert_bool_vec(@nospecialize(fun)) end function _get_subset_conditions(df::Union{AbstractDataFrame, GroupedDataFrame}, - (args,)::Ref{Any}, skipmissing::Bool) + (args,)::Ref{Any}, skipmissing::Bool, threads::Bool) cs_vec = [] for v in map(x -> broadcast_pair(df isa GroupedDataFrame ? parent(df) : df, x), args) if v isa AbstractVecOrMat{<:Pair} @@ -106,10 +106,12 @@ function _get_subset_conditions(df::Union{AbstractDataFrame, GroupedDataFrame}, @assert !isempty(conditions) if df isa AbstractDataFrame - df_conditions = select(df, conditions..., copycols=!(df isa DataFrame)) + df_conditions = select(df, conditions..., + copycols=!(df isa DataFrame), threads=threads) else df_conditions = select(df, conditions..., - copycols=!(parent(df) isa DataFrame), keepkeys=false) + copycols=!(parent(df) isa DataFrame), keepkeys=false, + threads=threads) end @assert ncol(df_conditions) == length(conditions) @@ -151,9 +153,11 @@ function _get_subset_conditions(df::Union{AbstractDataFrame, GroupedDataFrame}, end """ - subset(df::AbstractDataFrame, args...; skipmissing::Bool=false, view::Bool=false) - subset(gdf::GroupedDataFrame, args...; skipmissing::Bool=false, view::Bool=false, - ungroup::Bool=true) + subset(df::AbstractDataFrame, args...; + skipmissing::Bool=false, view::Bool=false, threads::Bool=true) + subset(gdf::GroupedDataFrame, args...; + skipmissing::Bool=false, view::Bool=false, + ungroup::Bool=true, threads::Bool=true) Return a copy of data frame `df` or parent of `gdf` containing only rows for which all values produced by transformation(s) `args` for a given row are @@ -180,6 +184,11 @@ If `view=true` a `SubDataFrame` view is returned instead of a `DataFrame`. If `ungroup=false` the resulting data frame is re-grouped based on the same grouping columns as `gdf` and a `GroupedDataFrame` is returned. +If `threads=true` (the default) transformations may be run in separate tasks which +can execute in parallel (possibly being applied to multiple rows or groups at the same time). +Whether or not tasks are actually spawned and their number are determined automatically. +Set to `false` if some transformations require serial execution or are not thread-safe. + If a `GroupedDataFrame` is passed then it must include all groups present in the `parent` data frame, like in [`select!`](@ref). @@ -260,18 +269,19 @@ julia> subset(groupby(df, :y), :v => x -> minimum(x) > 5) ``` """ function subset(df::AbstractDataFrame, @nospecialize(args...); - skipmissing::Bool=false, view::Bool=false) + skipmissing::Bool=false, view::Bool=false, threads::Bool=true) if isempty(args) row_selector = axes(df, 1) else - row_selector = _get_subset_conditions(df, Ref{Any}(args), skipmissing) + row_selector = _get_subset_conditions(df, Ref{Any}(args), + skipmissing, threads) end return view ? Base.view(df, row_selector, :) : df[row_selector, :] end function subset(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Bool=false, view::Bool=false, - ungroup::Bool=true) + ungroup::Bool=true, threads::Bool=true) df = parent(gdf) if isempty(args) if nrow(parent(gdf)) > 0 && minimum(gdf.groups) == 0 @@ -281,7 +291,8 @@ function subset(gdf::GroupedDataFrame, @nospecialize(args...); end row_selector = axes(df, 1) else - row_selector = _get_subset_conditions(gdf, Ref{Any}(args), skipmissing) + row_selector = _get_subset_conditions(gdf, Ref{Any}(args), + skipmissing, threads) end res = view ? Base.view(df, row_selector, :) : df[row_selector, :] # TODO: in some cases it might be faster to groupby gdf.groups[row_selector] @@ -289,9 +300,10 @@ function subset(gdf::GroupedDataFrame, @nospecialize(args...); end """ - subset!(df::AbstractDataFrame, args...; skipmissing::Bool=false) - subset!(gdf::GroupedDataFrame{DataFrame}, args..., skipmissing::Bool=false, - ungroup::Bool=true) + subset!(df::AbstractDataFrame, args...; + skipmissing::Bool=false, threads::Bool=true) + subset!(gdf::GroupedDataFrame{DataFrame}, args...; + skipmissing::Bool=false, ungroup::Bool=true, threads::Bool=true) Update data frame `df` or the parent of `gdf` in place to contain only rows for which all values produced by transformation(s) `args` for a given row is `true`. @@ -316,6 +328,11 @@ described for [`select`](@ref) with the restriction that: If `ungroup=false` the passed `GroupedDataFrame` `gdf` is updated (preserving the order of its groups) and returned. +If `threads=true` (the default) transformations may be run in separate tasks which +can execute in parallel (possibly being applied to multiple rows or groups at the same time). +Whether or not tasks are actually spawned and their number are determined automatically. +Set to `false` if some transformations require serial execution or are not thread-safe. + If `GroupedDataFrame` is subsetted then it must include all groups present in the `parent` data frame, like in [`select!`](@ref). In this case the passed `GroupedDataFrame` is updated to have correct groups after its parent is @@ -416,14 +433,15 @@ julia> df 2 │ 4 false false missing 12 ``` """ -function subset!(df::AbstractDataFrame, @nospecialize(args...); skipmissing::Bool=false) +function subset!(df::AbstractDataFrame, @nospecialize(args...); + skipmissing::Bool=false, threads::Bool=true) isempty(args) && return df - row_selector = _get_subset_conditions(df, Ref{Any}(args), skipmissing) + row_selector = _get_subset_conditions(df, Ref{Any}(args), skipmissing, threads) return deleteat!(df, findall(!, row_selector)) end function subset!(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Bool=false, - ungroup::Bool=true) + ungroup::Bool=true, threads::Bool=true) df = parent(gdf) if isempty(args) if nrow(parent(gdf)) > 0 && minimum(gdf.groups) == 0 @@ -436,7 +454,7 @@ function subset!(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Boo ngroups = length(gdf) groups = gdf.groups lazy_lock = gdf.lazy_lock - row_selector = _get_subset_conditions(gdf, Ref{Any}(args), skipmissing) + row_selector = _get_subset_conditions(gdf, Ref{Any}(args), skipmissing, threads) res = deleteat!(df, findall(!, row_selector)) if nrow(res) == length(groups) # we have not removed any rows return ungroup ? res : gdf diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index e7b51b0e7e..df4f519523 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -5,7 +5,7 @@ _ncol(df::AbstractDataFrame) = ncol(df) _ncol(x::Union{NamedTuple, DataFrameRow}) = length(x) function _combine_multicol((firstres,)::Ref{Any}, wfun::Ref{Any}, gd::GroupedDataFrame, - wincols::Ref{Any}) + wincols::Ref{Any}, threads::Bool) @assert only(wfun) isa Base.Callable @assert only(wincols) isa Union{Nothing, AbstractVector, Tuple, NamedTuple} firstmulticol = firstres isa MULTI_COLS_TYPE @@ -17,13 +17,14 @@ function _combine_multicol((firstres,)::Ref{Any}, wfun::Ref{Any}, gd::GroupedDat idx_agg = NOTHING_IDX_AGG end return _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols, - firstmulticol, idx_agg) + firstmulticol, idx_agg, threads) end function _combine_with_first((first,)::Ref{Any}, (f,)::Ref{Any}, gd::GroupedDataFrame, (incols,)::Ref{Any}, - firstmulticol::Bool, idx_agg::Vector{Int}) + firstmulticol::Bool, idx_agg::Vector{Int}, + threads::Bool) @assert first isa Union{NamedTuple, DataFrameRow, AbstractDataFrame} @assert f isa Base.Callable @assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple} @@ -76,7 +77,8 @@ function _combine_with_first((first,)::Ref{Any}, gd, Ref{Any}(incols), Ref{Any}(targetcolnames), - firstmulticol) + firstmulticol, + threads) end return idx, outcols, collect(Symbol, finalcolnames) end @@ -238,7 +240,8 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any}, gd::GroupedDataFrame, (incols,)::Ref{Any}, (colnames,)::Ref{Any}, - firstmulticol::Bool) + firstmulticol::Bool, + threads::Bool) @assert firstrow isa Union{NamedTuple, DataFrameRow} @assert outcols isa NTuple{N, AbstractVector} where N @assert f isa Base.Callable @@ -261,7 +264,7 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any}, # Create up to one task per thread # This has lower overhead than creating one task per group, # but is optimal only if operations take roughly the same time for all groups - if VERSION >= v"1.4" && isthreadsafe(outcols, incols) + if VERSION >= v"1.4" && threads && isthreadsafe(outcols, incols) basesize = max(1, cld(len - 1, Threads.nthreads())) partitions = Iterators.partition(2:len, basesize) else @@ -273,11 +276,11 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any}, tasks = Vector{Task}(undef, length(partitions)) for (tid, idx) in enumerate(partitions) tasks[tid] = - @spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), - outcols, outcolsref, - type_widened, widen_type_lock, - f, gd, starts, ends, incols, colnames, - firstcoltype(firstmulticol)) + @spawn_or_run_task threads _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), + outcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, incols, colnames, + firstcoltype(firstmulticol)) end # Workaround JuliaLang/julia#38931: diff --git a/src/groupeddataframe/splitapplycombine.jl b/src/groupeddataframe/splitapplycombine.jl index e2c478c6ea..5e69ad6a5c 100644 --- a/src/groupeddataframe/splitapplycombine.jl +++ b/src/groupeddataframe/splitapplycombine.jl @@ -25,7 +25,8 @@ end function _combine_prepare(gd::GroupedDataFrame, (cs,)::Ref{Any}; keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool) + keeprows::Bool, renamecols::Bool, + threads::Bool) for cei in cs if !(cei isa AbstractMatrix && isempty(cei)) @assert cei isa Union{Pair, Base.Callable, ColumnIndex, @@ -48,13 +49,14 @@ function _combine_prepare(gd::GroupedDataFrame, end end return _combine_prepare_norm(gd, cs_vec, keepkeys, ungroup, copycols, - keeprows, renamecols) + keeprows, renamecols, threads) end function _combine_prepare_norm(gd::GroupedDataFrame, cs_vec::Vector{Any}, keepkeys::Bool, ungroup::Bool, copycols::Bool, - keeprows::Bool, renamecols::Bool) + keeprows::Bool, renamecols::Bool, + threads::Bool) if any(x -> x isa Pair && first(x) isa Tuple, cs_vec) x = cs_vec[findfirst(x -> first(x) isa Tuple, cs_vec)] # an explicit error is thrown as this was allowed in the past @@ -81,7 +83,8 @@ function _combine_prepare_norm(gd::GroupedDataFrame, # if optional_transform[i] is true then the transformation will be skipped # if earlier column with a column with the same name was created - idx, valscat = _combine(gd, cs_norm, optional_transform, copycols, keeprows, renamecols) + idx, valscat = _combine(gd, cs_norm, optional_transform, copycols, keeprows, + renamecols, threads) !keepkeys && ungroup && return valscat @@ -357,11 +360,13 @@ function _combine_process_callable(wcs_i::Ref{Any}, gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Ref{Vector{Int}}) + idx_agg::Ref{Vector{Int}}, + threads::Bool) cs_i = only(wcs_i) @assert cs_i isa Base.Callable firstres = length(gd) > 0 ? cs_i(gd[1]) : cs_i(similar(parentdf, 0)) - idx, outcols, nms = _combine_multicol(Ref{Any}(firstres), wcs_i, gd, Ref{Any}(nothing)) + idx, outcols, nms = _combine_multicol(Ref{Any}(firstres), wcs_i, gd, + Ref{Any}(nothing), threads) if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}) @@ -410,7 +415,8 @@ function _combine_process_pair_symbol(optional_i::Bool, firstmulticol::Bool, (firstres,)::Ref{Any}, wfun::Ref{Any}, - wincols::Ref{Any}) + wincols::Ref{Any}, + threads::Bool) @assert only(wfun) isa Base.Callable @assert only(wincols) isa Union{Tuple, NamedTuple} @@ -434,7 +440,8 @@ function _combine_process_pair_symbol(optional_i::Bool, # NOTHING_IDX_AGG to signal that idx has to be computed in _combine_with_first idx, outcols, _ = _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols, firstmulticol, - firstres isa AbstractVector ? NOTHING_IDX_AGG : idx_agg[]) + firstres isa AbstractVector ? NOTHING_IDX_AGG : idx_agg[], + threads) @assert length(outcols) == 1 outcol = outcols[1] @@ -466,13 +473,14 @@ function _combine_process_pair_astable(optional_i::Bool, firstmulticol::Bool, (firstres,)::Ref{Any}, wfun::Ref{Any}, - wincols::Ref{Any}) + wincols::Ref{Any}, + threads::Bool) fun = only(wfun) @assert fun isa Base.Callable @assert only(wincols) isa Union{Tuple, NamedTuple} if firstres isa AbstractVector idx, outcol_vec, _ = _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols, - firstmulticol, NOTHING_IDX_AGG) + firstmulticol, NOTHING_IDX_AGG, threads) @assert length(outcol_vec) == 1 res = outcol_vec[1] @assert length(res) > 0 @@ -494,7 +502,8 @@ function _combine_process_pair_astable(optional_i::Bool, oldfun = fun fun = (x...) -> Tables.columntable(oldfun(x...)) end - idx, outcols, nms = _combine_multicol(Ref{Any}(firstres), Ref{Any}(fun), gd, wincols) + idx, outcols, nms = _combine_multicol(Ref{Any}(firstres), Ref{Any}(fun), gd, + wincols, threads) if !(firstres isa Union{AbstractVecOrMat, AbstractDataFrame, NamedTuple{<:Any, <:Tuple{Vararg{AbstractVector}}}}) @@ -562,7 +571,8 @@ function _combine_process_pair((cs_i,)::Ref{Any}, gd::GroupedDataFrame, seen_cols::Dict{Symbol, Tuple{Bool, Int}}, trans_res::Vector{TransformationResult}, - idx_agg::Ref{Vector{Int}}) + idx_agg::Ref{Vector{Int}}, + threads::Bool) @assert cs_i isa Pair source_cols, (fun, out_col_name) = cs_i @@ -586,12 +596,12 @@ function _combine_process_pair((cs_i,)::Ref{Any}, if out_col_name isa Symbol return _combine_process_pair_symbol(optional_i, gd, seen_cols, trans_res, idx_agg, out_col_name, firstmulticol, Ref{Any}(firstres), - Ref{Any}(fun), Ref{Any}(incols)) + Ref{Any}(fun), Ref{Any}(incols), threads) end if out_col_name == AsTable || out_col_name isa AbstractVector{Symbol} return _combine_process_pair_astable(optional_i, gd, seen_cols, trans_res, idx_agg, out_col_name, firstmulticol, Ref{Any}(firstres), - Ref{Any}(fun), Ref{Any}(incols)) + Ref{Any}(fun), Ref{Any}(incols), threads) end throw(ArgumentError("unsupported target column name specifier $out_col_name")) end @@ -615,7 +625,8 @@ end function _combine(gd::GroupedDataFrame, cs_norm::Vector{Any}, optional_transform::Vector{Bool}, - copycols::Bool, keeprows::Bool, renamecols::Bool) + copycols::Bool, keeprows::Bool, renamecols::Bool, + threads::Bool) if isempty(cs_norm) if keeprows && nrow(parent(gd)) > 0 && minimum(gd.groups) == 0 throw(ArgumentError("select and transform do not support " * @@ -666,8 +677,7 @@ function _combine(gd::GroupedDataFrame, for i in eachindex(cs_norm, optional_transform, tasks) cs_i = cs_norm[i] optional_i = optional_transform[i] - - tasks[i] = @spawn if length(gd) > 0 && isagg(cs_i, gd) + tasks[i] = @spawn_or_run_task threads if length(gd) > 0 && isagg(cs_i, gd) _combine_process_agg(Ref{Any}(cs_i), optional_i, parentdf, gd, seen_cols, trans_res, idx_agg[]) elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity && @@ -678,7 +688,7 @@ function _combine(gd::GroupedDataFrame, seen_cols, trans_res, idx_keeprows, copycols) elseif cs_i isa Base.Callable _combine_process_callable(Ref{Any}(cs_i), optional_i, parentdf, gd, - seen_cols, trans_res, idx_agg) + seen_cols, trans_res, idx_agg, threads) else @assert cs_i isa Pair if first(cs_i) isa Vector{Int} && isempty(first(cs_i)) && @@ -691,7 +701,7 @@ function _combine(gd::GroupedDataFrame, seen_cols, trans_res, idx_agg[]) else _combine_process_pair(Ref{Any}(cs_i), optional_i, parentdf, gd, - seen_cols, trans_res, idx_agg) + seen_cols, trans_res, idx_agg, threads) end end end @@ -761,8 +771,8 @@ function _combine(gd::GroupedDataFrame, @sync for i in eachindex(trans_res) let i=i - @spawn reorder_cols!(trans_res, i, trans_res[i].col, trans_res[i].col_idx, - keeprows, idx_keeprows, gd) + @spawn_or_run threads reorder_cols!(trans_res, i, trans_res[i].col, trans_res[i].col_idx, + keeprows, idx_keeprows, gd) end end @@ -794,15 +804,18 @@ function reorder_cols!(trans_res::Vector{TransformationResult}, i::Integer, end function combine(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + threads::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame")) end - return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols) + return combine(gd, f, keepkeys=keepkeys, ungroup=ungroup, renamecols=renamecols, + threads=threads) end combine(@nospecialize(f::Pair), gd::GroupedDataFrame; - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + threads::Bool=true) = throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " * "You can pass a `Pair` as the second argument of the transformation. If you want the return " * "value to be processed as having multiple columns add `=> AsTable` suffix to the pair.")) @@ -810,68 +823,82 @@ combine(@nospecialize(f::Pair), gd::GroupedDataFrame; combine(gd::GroupedDataFrame, @nospecialize(args::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex, AbstractVecOrMat}...); - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + threads::Bool=true) = _combine_prepare(gd, Ref{Any}(map(x -> broadcast_pair(parent(gd), x), args)), keepkeys=keepkeys, ungroup=ungroup, - copycols=true, keeprows=false, renamecols=renamecols) + copycols=true, keeprows=false, renamecols=renamecols, + threads=threads) function select(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + threads::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) + return select(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, + threads=threads) end select(gd::GroupedDataFrame, @nospecialize(args::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex, AbstractVecOrMat}...); - copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) = + copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + threads::Bool=true) = _combine_prepare(gd, Ref{Any}(map(x -> broadcast_pair(parent(gd), x), args)), copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, keeprows=true, renamecols=renamecols) + ungroup=ungroup, keeprows=true, renamecols=renamecols, + threads=threads) function transform(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; copycols::Bool=true, - keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true, + threads::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup) + return transform(gd, f, copycols=copycols, keepkeys=keepkeys, ungroup=ungroup, + threads=threads) end function transform(gd::GroupedDataFrame, @nospecialize(args::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex, AbstractVecOrMat}...); - copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) + copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true, + renamecols::Bool=true, + threads::Bool=true) res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys, - ungroup=ungroup, renamecols=renamecols) + ungroup=ungroup, renamecols=renamecols, threads=threads) # res can be a GroupedDataFrame based on DataFrame or a DataFrame, # so parent always gives a data frame - select!(parent(res), propertynames(parent(gd)), :) + select!(parent(res), propertynames(parent(gd)), :, threads=threads) return res end -function select!(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) +function select!(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end - return select!(gd, f, ungroup=ungroup) + return select!(gd, f, ungroup=ungroup, threads=threads) end function select!(gd::GroupedDataFrame, @nospecialize(args::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex, AbstractVecOrMat}...); - ungroup::Bool=true, renamecols::Bool=true) + ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true) df = parent(gd) if df isa DataFrame - newdf = select(gd, args..., copycols=false, renamecols=renamecols) + newdf = select(gd, args..., copycols=false, renamecols=renamecols, + threads=threads) else @assert df isa SubDataFrame - newdf = select(gd, args..., copycols=true, renamecols=renamecols) + newdf = select(gd, args..., copycols=true, renamecols=renamecols, + threads=threads) end _replace_columns!(df, newdf) return ungroup ? df : gd end -function transform!(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true) +function transform!(@nospecialize(f::Base.Callable), gd::GroupedDataFrame; + ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true) if f isa Colon throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame")) end @@ -881,15 +908,17 @@ end function transform!(gd::GroupedDataFrame, @nospecialize(args::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex, AbstractVecOrMat}...); - ungroup::Bool=true, renamecols::Bool=true) + ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true) df = parent(gd) if df isa DataFrame - newdf = select(gd, :, args..., copycols=false, renamecols=renamecols) + newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, + threads=threads) else @assert df isa SubDataFrame - newdf = select(gd, :, args..., copycols=true, renamecols=renamecols) + newdf = select(gd, :, args..., copycols=true, renamecols=renamecols, + threads=threads) end - select!(newdf, propertynames(df), :) + select!(newdf, propertynames(df), :, threads=threads) _replace_columns!(df, newdf) return ungroup ? df : gd end diff --git a/src/other/utils.jl b/src/other/utils.jl index 65fa4ae793..c836e97e31 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -217,7 +217,7 @@ Parallelize a `for` loop by spawning separate tasks iterating each over a chunk of at least `basesize` elements in `range`. -A number of task higher than `Threads.nthreads()` may be spawned, +A number of tasks higher than `Threads.nthreads()` may be spawned, since that can allow for a more efficient load balancing in case some threads are busy (nested parallelism). """ @@ -231,6 +231,100 @@ macro spawn_for_chunks(basesize, ex) return _spawn_for_chunks_helper(ex.args[1], ex.args[2], basesize) end +""" + @spawn_or_run_task threads expr + +Equivalent to `Threads.@spawn` if `threads === true`, +otherwise run `expr` and return a `Task` that returns its value. +""" +macro spawn_or_run_task end + +""" + @spawn_or_run threads expr + +Equivalent to `Threads.@spawn` if `threads === true`, +otherwise run `expr`. +""" +macro spawn_or_run end + +if VERSION >= v"1.4" + macro spawn_or_run_task(threads, expr) + letargs = Base._lift_one_interp!(expr) + + thunk = esc(:(()->($expr))) + var = esc(Base.sync_varname) + quote + let $(letargs...) + if $(esc(threads)) + local task = Task($thunk) + task.sticky = false + else + # Run expr immediately + res = $thunk() + # Return a Task that returns the value of expr + local task = Task(() -> res) + task.sticky = true + end + if $(Expr(:islocal, var)) + @static if VERSION >= v"1.5.0" + put!($var, task) + else + push!($var, task) + end + end + schedule(task) + task + end + end + end + + macro spawn_or_run(threads, expr) + letargs = Base._lift_one_interp!(expr) + + thunk = esc(:(()->($expr))) + var = esc(Base.sync_varname) + quote + let $(letargs...) + if $(esc(threads)) + local task = Task($thunk) + task.sticky = false + if $(Expr(:islocal, var)) + @static if VERSION >= v"1.5.0" + put!($var, task) + else + push!($var, task) + end + end + schedule(task) + else + $thunk() + end + nothing + end + end + end +else + # Based on the definition of @async in Base + macro spawn_or_run_task(threads, expr) + thunk = esc(:(()->($expr))) + var = esc(Base.sync_varname) + quote + # Run expr immediately + res = $thunk() + # Return a Task that returns the value of expr + local task = Task(() -> res) + if $(Expr(:isdefined, var)) + push!($var, task) + end + schedule(task) + end + end + + macro spawn_or_run(threads, expr) + esc(:($expr; nothing)) + end +end + function _nt_like_hash(v, h::UInt) length(v) == 0 && return hash(NamedTuple(), h) diff --git a/test/multithreading.jl b/test/multithreading.jl new file mode 100644 index 0000000000..c0b8079263 --- /dev/null +++ b/test/multithreading.jl @@ -0,0 +1,274 @@ +module TestMultithreading + +using Test, DataFrames + + +@testset "pre-Julia 1.3 @spawn replacement" begin + t = @sync DataFrames.@spawn begin + sleep(1) + true + end + @test fetch(t) === true +end + +@testset "split_indices" begin + for len in 1:100, basesize in 1:10 + x = DataFrames.split_indices(len, basesize) + + @test length(x) == max(1, div(len, basesize)) + @test reduce(vcat, x) == 1:len + vmin, vmax = extrema(length(v) for v in x) + @test vmin + 1 == vmax || vmin == vmax + @test len < basesize || vmin >= basesize + end + + @test_throws AssertionError DataFrames.split_indices(0, 10) + @test_throws AssertionError DataFrames.split_indices(10, 0) + + # Check overflow on 32-bit + len = typemax(Int32) + basesize = 100_000_000 + x = collect(DataFrames.split_indices(len, basesize)) + @test length(x) == div(len, basesize) + @test x[1][1] === 1 + @test x[end][end] === Int(len) + vmin, vmax = extrema(length(v) for v in x) + @test vmin + 1 == vmax || vmin == vmax + @test len < basesize || vmin >= basesize +end + +@testset "split_to_chunks" begin + for lg in 1:100, nt in 1:11 + if lg < nt + @test_throws AssertionError DataFrames.split_to_chunks(lg, nt) + continue + end + x = collect(DataFrames.split_to_chunks(lg, nt)) + @test reduce(vcat, x) == 1:lg + @test sum(length, x) == lg + @test first(x[1]) == 1 + @test last(x[end]) == lg + @test length(x) == nt + for i in 1:nt-1 + @test first(x[i+1])-last(x[i]) == 1 + end + end + + @test_throws AssertionError DataFrames.split_to_chunks(0, 10) + @test_throws AssertionError DataFrames.split_to_chunks(10, 0) + @test_throws AssertionError DataFrames.split_to_chunks(10, 11) +end + +@testset "@spawn_or_run_task and @spawn_or_run" begin + for threads in (true, false) + t = DataFrames.@spawn_or_run_task threads 1 + @test fetch(t) === 1 + + x = Ref(false) + @sync begin + t = DataFrames.@spawn_or_run_task threads begin + sleep(0.1) + x[] = true + end + end + @test x[] + + x = Ref(false) + @sync begin + res = DataFrames.@spawn_or_run threads begin + sleep(0.1) + x[] = true + end + @test res === nothing + end + @test x[] + end +end + +@testset "disabling multithreading via keyword argument" begin + refdf = DataFrame(x=1:1000, y=rand(1:4, 1000)) + + # On DataFrame + df = copy(refdf) + n = Ref(0) + @test combine(df, [] => (() -> n[] += 1) => :n1, + [] => (() -> n[] += 1) => :n2, + threads=false) == + DataFrame(n1=1, n2=2) + n = Ref(0) + @test combine(df, [] => ByRow(() -> n[] += 1) => :n1, + [] => ByRow(() -> n[] += 1) => :n2, + threads=false) == + DataFrame(n1=1:1000, n2=1001:2000) + + df = copy(refdf) + m = Ref(0) + n = Ref(0) + @test select(df, [] => (() -> m[] += 1) => :n1, + [] => (() -> m[] += 1) => :n2, + threads=false) == + select!(df, [] => (() -> n[] += 1) => :n1, + [] => (() -> n[] += 1) => :n2, + threads=false) == + DataFrame(n1=fill(1, 1000), n2=fill(2, 1000)) + df = copy(refdf) + m = Ref(0) + n = Ref(0) + @test select(df, [] => ByRow(() -> m[] += 1) => :n1, + [] => ByRow(() -> m[] += 1) => :n2, + threads=false) == + select!(df, [] => ByRow(() -> n[] += 1) => :n1, + [] => ByRow(() -> n[] += 1) => :n2, + threads=false) == + DataFrame(n1=1:1000, n2=1001:2000) + + df = copy(refdf) + m = Ref(0) + n = Ref(0) + @test transform(df, [] => (() -> m[] += 1) => :n1, + [] => (() -> m[] += 1) => :n2, + threads=false) == + transform!(df, [] => (() -> n[] += 1) => :n1, + [] => (() -> n[] += 1) => :n2, + threads=false) == + [refdf DataFrame(n1=fill(1, 1000), n2=fill(2, 1000))] + df = copy(refdf) + m = Ref(0) + n = Ref(0) + @test transform(df, [] => ByRow(() -> m[] += 1) => :n1, + [] => ByRow(() -> m[] += 1) => :n2, + threads=false) == + transform(df, [] => ByRow(() -> n[] += 1) => :n1, + [] => ByRow(() -> n[] += 1) => :n2, + threads=false) == + [refdf DataFrame(n1=1:1000, n2=1001:2000)] + + df = copy(refdf) + m = Ref(0) + n = Ref(0) + @test df[1:100,:] == + subset(df, [] => ByRow(() -> (m[] += 1; m[] <= 100)), + [] => ByRow(() -> (m[] += 1; m[] <= 1100)), + threads=false) == + subset(df, [] => ByRow(() -> (n[] += 1; n[] <= 100)), + [] => ByRow(() -> (n[] += 1; n[] <= 1100)), + threads=false) + + # On GroupedDataFrame + df = copy(refdf) + gd = groupby(df, :y) + n = Ref(0) + @test combine(gd, [] => (() -> n[] += 1) => :n1, + [] => (() -> n[] += 1) => :n2, + threads=false) == + DataFrame(y=1:4, n1=1:4, n2=5:8) + if VERSION >= v"1.4" && Threads.nthreads() > 1 + @test combine(gd, [] => (() -> Threads.threadid()) => :id1, + [] => (() -> Threads.threadid()) => :id2, + threads=true) != + DataFrame(y=1:4, id1=1, id2=1) + end + + df = copy(refdf) + gd = groupby(df, :y) + m = Ref(0) + n = Ref(0) + @test select(gd, [] => (() -> m[] += 1) => :n1, + [] => (() -> m[] += 1) => :n2, + threads=false) == + select!(gd, [] => (() -> n[] += 1) => :n1, + [] => (() -> n[] += 1) => :n2, + threads=false) == + select(leftjoin(refdf, DataFrame(y=1:4, n1=1:4, n2=5:8), on=:y), :y, :n1, :n2) + if VERSION >= v"1.4" && Threads.nthreads() > 1 + df = copy(refdf) + gd = groupby(df, :y) + @test select(gd, [] => (() -> Threads.threadid()) => :id1, + [] => (() -> Threads.threadid()) => :id2, + threads=true) != + DataFrame(y=refdf.y, id1=1, id2=1) + @test select!(gd, [] => (() -> Threads.threadid()) => :id1, + [] => (() -> Threads.threadid()) => :id2, + threads=true) != + DataFrame(y=refdf.y, id1=1, id2=1) + end + + df = copy(refdf) + gd = groupby(df, :y) + m = Ref(0) + n = Ref(0) + @test transform(gd, [] => (() -> m[] += 1) => :n1, + [] => (() -> m[] += 1) => :n2, + threads=false) == + transform!(gd, [] => (() -> n[] += 1) => :n1, + [] => (() -> n[] += 1) => :n2, + threads=false) == + leftjoin(refdf, DataFrame(y=1:4, n1=1:4, n2=5:8), on=:y) + if VERSION >= v"1.4" && Threads.nthreads() > 1 + df = copy(refdf) + gd = groupby(df, :y) + @test transform(gd, [] => (() -> Threads.threadid()) => :id1, + [] => (() -> Threads.threadid()) => :id2, + threads=true) != + [refdf DataFrame(id1=fill(1, nrow(refdf)), id2=1)] + @test transform!(gd, [] => (() -> Threads.threadid()) => :id1, + [] => (() -> Threads.threadid()) => :id2, + threads=true) != + [refdf DataFrame(id1=fill(1, nrow(refdf)), id2=1)] + end + + df = copy(refdf) + gd = groupby(df, :y) + m = Ref(0) + n = Ref(0) + @test df[in.(df.y, Ref((keys(gd)[1].y, keys(gd)[2].y))),:] == + subset(gd, [:y] => (y -> (m[] += 1; fill(m[] <= 2, length(y)))), + [:y] => (y -> (m[] += 1; fill(m[] <= 6, length(y)))), + threads=false) == + subset!(gd, [:y] => (y -> (n[] += 1; fill(n[] <= 2, length(y)))), + [:y] => (y -> (n[] += 1; fill(n[] <= 6, length(y)))), + threads=false) + + # unstack + df = DataFrame(id=[1, 1, 1, 2, 2, 3, 3], + variable=[:a, :a, :b, :a, :b, :a, :b], + value=1:7) + l = Ref(0) + m = Ref(0) + n = Ref(0) + unstack(df, + allowduplicates=true, valuestransform=x -> (l[] += 1), + threads=false) == + DataFrame(id=1:3, a=[1, 3, 5], b=[2, 4, 6]) == + unstack(df, :variable, :value, + allowduplicates=true, valuestransform=x -> (m[] += 1), + threads=false) == + DataFrame(id=1:3, a=[1, 3, 5], b=[2, 4, 6]) == + unstack(df, :id, :variable, :value, + allowduplicates=true, valuestransform=x -> (n[] += 1), + threads=false) == + DataFrame(id=1:3, a=[1, 3, 5], b=[2, 4, 6]) + + # describe + df = DataFrame(x=1:10, y=2:11) + n = Ref(0) + @test describe(df, cols=All() .=> (x -> (n[] += 1))) == + describe(DataFrame(x_function=1, y_function=2)) + n = Ref(0) + @test describe(df, cols=All() .=> ByRow(x -> (n[] += 1))) == + describe(DataFrame(x_function=1:10, y_function=11:20)) + + # nonunique + df = DataFrame(x=1:10, y=2:11) + n = Ref(0) + @test nonunique(df, [:x => (x -> (@assert(n[] == 0); n[] += 1)), + :y => (y -> (@assert(n[] == 1); n[] += 1))]) == + [false, true, true, true, true, true, true, true, true, true] + n = Ref(0) + @test nonunique(df, [:x => ByRow(x -> (n[] == 2 ? n[] = 1 : n[] += 1)), + :y => ByRow(x -> (n[] == 4 ? n[] = 1 : n[] += 1))]) == + [false, false, false, false, true, true, true, true, true, true] + +end + +end # module \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 4ade9e1b6f..08a5fa1fdb 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -41,6 +41,7 @@ my_tests = ["utils.jl", "indexing.jl", "broadcasting.jl", "string.jl", + "multithreading.jl", "precompile.jl", "deprecated.jl"] diff --git a/test/utils.jl b/test/utils.jl index 8cc7c5e9bb..9ad18aa38c 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -93,62 +93,6 @@ end :sum_skipmissing_div12 end -@testset "pre-Julia 1.3 @spawn replacement" begin - t = @sync DataFrames.@spawn begin - sleep(1) - true - end - @test fetch(t) === true -end - -@testset "split_indices" begin - for len in 1:100, basesize in 1:10 - x = DataFrames.split_indices(len, basesize) - - @test length(x) == max(1, div(len, basesize)) - @test reduce(vcat, x) == 1:len - vmin, vmax = extrema(length(v) for v in x) - @test vmin + 1 == vmax || vmin == vmax - @test len < basesize || vmin >= basesize - end - - @test_throws AssertionError DataFrames.split_indices(0, 10) - @test_throws AssertionError DataFrames.split_indices(10, 0) - - # Check overflow on 32-bit - len = typemax(Int32) - basesize = 100_000_000 - x = collect(DataFrames.split_indices(len, basesize)) - @test length(x) == div(len, basesize) - @test x[1][1] === 1 - @test x[end][end] === Int(len) - vmin, vmax = extrema(length(v) for v in x) - @test vmin + 1 == vmax || vmin == vmax - @test len < basesize || vmin >= basesize -end - -@testset "split_to_chunks" begin - for lg in 1:100, nt in 1:11 - if lg < nt - @test_throws AssertionError DataFrames.split_to_chunks(lg, nt) - continue - end - x = collect(DataFrames.split_to_chunks(lg, nt)) - @test reduce(vcat, x) == 1:lg - @test sum(length, x) == lg - @test first(x[1]) == 1 - @test last(x[end]) == lg - @test length(x) == nt - for i in 1:nt-1 - @test first(x[i+1])-last(x[i]) == 1 - end - end - - @test_throws AssertionError DataFrames.split_to_chunks(0, 10) - @test_throws AssertionError DataFrames.split_to_chunks(10, 0) - @test_throws AssertionError DataFrames.split_to_chunks(10, 11) -end - @testset "_findall(B::BitVector)" begin Random.seed!(1234) BD = Dict(