-
Notifications
You must be signed in to change notification settings - Fork 374
Add a keyword argument to disable multithreading #3030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
a5b1089
a31bf16
37604e6
afd9b8f
c24618a
f218189
39300fc
77e3475
7e3321f
fd665cc
b488196
10e262f
c21a1c8
aa391cd
4b5163c
d4dfb0f
27028e0
2c6488e
2d8d5f5
ce4a2cf
4bbbb27
e575338
1382ffe
f63fe97
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -6,8 +6,15 @@ CurrentModule = DataFrames | |||||
|
|
||||||
| ## Multi-threading support | ||||||
|
|
||||||
| Selected operations in DataFrames.jl automatically use multiple threads when available. | ||||||
| It is task-based and implemented using the `@spawn` macro from Julia Base. | ||||||
| By default, selected operations in DataFrames.jl automatically use multiple threads | ||||||
| when available. It is task-based and implemented using the `@spawn` macro from Julia Base. | ||||||
| Multithreading can be disabled when running a particular block of code using the | ||||||
| [`DataFrames.singlethreaded() do... end`(@ref) syntax. It can also be disabled | ||||||
| globally by calling [`DataFrames.setmultithreading(true)`](@ref). | ||||||
|
||||||
| globally by calling [`DataFrames.setmultithreading(true)`](@ref). | |
| globally by calling [`DataFrames.setmultithreading(false)`](@ref). |
DataFrames.setmultithreading(true) sounds like enabling multithreading, not disabling it. Maybe I'm misunderstanding the sentence though.
nalimilan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -261,7 +261,7 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any}, | |
| # Create up to one task per thread | ||
| # This has lower overhead than creating one task per group, | ||
| # but is optimal only if operations take roughly the same time for all groups | ||
| if VERSION >= v"1.4" && isthreadsafe(outcols, incols) | ||
| if VERSION >= v"1.4" && ismultithreaded() && isthreadsafe(outcols, incols) | ||
| basesize = max(1, cld(len - 1, Threads.nthreads())) | ||
| partitions = Iterators.partition(2:len, basesize) | ||
| else | ||
|
|
@@ -273,11 +273,11 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any}, | |
| tasks = Vector{Task}(undef, length(partitions)) | ||
| for (tid, idx) in enumerate(partitions) | ||
| tasks[tid] = | ||
| @spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), | ||
| outcols, outcolsref, | ||
| type_widened, widen_type_lock, | ||
| f, gd, starts, ends, incols, colnames, | ||
| firstcoltype(firstmulticol)) | ||
| @spawn_or_async _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), | ||
|
||
| outcols, outcolsref, | ||
| type_widened, widen_type_lock, | ||
| f, gd, starts, ends, incols, colnames, | ||
| firstcoltype(firstmulticol)) | ||
| end | ||
|
|
||
| # Workaround JuliaLang/julia#38931: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,6 +131,115 @@ end | |
|
|
||
| funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner)) | ||
|
|
||
|
|
||
| const SINGLETHREADING = Threads.Atomic{Bool}(false) | ||
| const SINGLETHREADING_DEPTH = Threads.Atomic{Int}(0) | ||
|
|
||
| ismultithreaded() = SINGLETHREADING_DEPTH[] == 0 | ||
|
|
||
| """ | ||
| DataFrames.singlethreaded(f) | ||
|
|
||
| Run function `f` while disabling multithreading in all DataFrames.jl operations. | ||
| This is useful in particular to run functions which are not thread-safe, or when | ||
| distribution of work across threads is managed separately. | ||
|
||
|
|
||
| *See also*: [`DataFrames.setmultithreading`](@ref) to disable multithreading globally | ||
|
|
||
| !!! note | ||
|
|
||
| This function is considered as experimental and may change or be removed once | ||
| a cross-package mechanism for multithreading configuration is developed. | ||
|
|
||
| Currently, it disables multithreading for any DataFrames.jl | ||
| operations which may be run while `f` is running (e.g. if tasks using data | ||
| frames have been spawned on multiple threads). | ||
| This may change in the future. | ||
|
|
||
| # Examples | ||
| ```jldoctest | ||
| julia> df = DataFrame(x=repeat(1:5, inner=2), y=1:10); | ||
|
|
||
| julia> gd = groupby(df, :x); | ||
|
|
||
| julia> counter = 0; | ||
|
|
||
| julia> f(x) = (sleep(0.1); global counter += 1); # Thread-unsafe function | ||
|
|
||
| julia> DataFrames.singlethreaded() do | ||
| combine(gd, :y => f) | ||
| end | ||
| 5×2 DataFrame | ||
| Row │ x y_f | ||
| │ Int64 Int64 | ||
| ─────┼────────────── | ||
| 1 │ 1 1 | ||
| 2 │ 2 2 | ||
| 3 │ 3 3 | ||
| 4 │ 4 4 | ||
| 5 │ 5 5 | ||
| ``` | ||
| """ | ||
| function singlethreaded(f) | ||
| Threads.atomic_add!(SINGLETHREADING_DEPTH, 1) | ||
| try | ||
| return f() | ||
| finally | ||
| Threads.atomic_sub!(SINGLETHREADING_DEPTH, 1) | ||
| end | ||
| end | ||
|
|
||
| """ | ||
| DataFrames.setmultithreading(enable::Bool) | ||
|
|
||
| Enable or disable multithreading permanently in all DataFrames.jl operations. | ||
| This is useful in particular to run functions which are not thread-safe, or when | ||
| distribution of work across threads is managed separately. | ||
|
|
||
| *See also*: [`DataFrames.singlethreaded`](@ref) to disable multithreading only | ||
| for a specific code block | ||
|
|
||
| !!! note | ||
|
|
||
| This function is considered as experimental and may change or be removed once | ||
| a cross-package mechanism for multithreading configuration is developed. | ||
|
|
||
| # Examples | ||
| ```jldoctest | ||
| julia> df = DataFrame(x=repeat(1:5, inner=2), y=1:10); | ||
|
|
||
| julia> gd = groupby(df, :x); | ||
|
|
||
| julia> counter = 0; | ||
|
|
||
| julia> f(x) = (sleep(0.1); global counter += 1); # Thread-unsafe function | ||
|
|
||
| julia> DataFrames.setmultithreading(false); | ||
|
|
||
| julia> combine(gd, :y => f) | ||
| 5×2 DataFrame | ||
| Row │ x y_f | ||
| │ Int64 Int64 | ||
| ─────┼────────────── | ||
| 1 │ 1 1 | ||
| 2 │ 2 2 | ||
| 3 │ 3 3 | ||
| 4 │ 4 4 | ||
| 5 │ 5 5 | ||
|
|
||
| julia> DataFrames.setmultithreading(true); | ||
| ``` | ||
| """ | ||
| function setmultithreading(enable::Bool) | ||
| old_state = Threads.atomic_xchg!(SINGLETHREADING, !enable) | ||
| if !enable && !old_state | ||
| Threads.atomic_add!(SINGLETHREADING_DEPTH, 1) | ||
| elseif enable && old_state | ||
| Threads.atomic_sub!(SINGLETHREADING_DEPTH, 1) | ||
| end | ||
| return enable | ||
| end | ||
|
|
||
| # Compute chunks of indices, each with at least `basesize` entries | ||
| # This method ensures balanced sizes by avoiding a small last chunk | ||
| function split_indices(len::Integer, basesize::Integer) | ||
|
|
@@ -159,7 +268,7 @@ if VERSION >= v"1.4" | |
|
|
||
| nt = Threads.nthreads() | ||
| len = length(x) | ||
| if nt > 1 && len > basesize | ||
| if ismultithreaded() && nt > 1 && len > basesize | ||
| tasks = [Threads.@spawn begin | ||
| for i in p | ||
| local $(esc(lidx)) = @inbounds x[i] | ||
|
|
@@ -215,6 +324,89 @@ macro spawn_for_chunks(basesize, ex) | |
| return _spawn_for_chunks_helper(ex.args[1], ex.args[2], basesize) | ||
| end | ||
|
|
||
| """ | ||
| @spawn_or_async expr | ||
|
|
||
| Equivalent to `Threads.@spawn` if [`DataFrames.ismultithreaded() === true`](@ref) | ||
| and to `@async` otherwise. | ||
| """ | ||
| spawn_or_async | ||
nalimilan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| """ | ||
| @spawn_or_run expr | ||
|
|
||
| Equivalent to `Threads.@spawn` if [`DataFrames.ismultithreaded() === true`](@ref), | ||
| otherwise simply runs `expr`. | ||
| """ | ||
| spawn_or_run | ||
|
|
||
| if VERSION >= v"1.4" | ||
| macro spawn_or_async(expr) | ||
| letargs = Base._lift_one_interp!(expr) | ||
|
|
||
| thunk = esc(:(()->($expr))) | ||
| var = esc(Base.sync_varname) | ||
| quote | ||
bkamins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let $(letargs...) | ||
| local task = Task($thunk) | ||
| task.sticky = !DataFrames.ismultithreaded() | ||
| if $(Expr(:islocal, var)) | ||
| @static if VERSION >= v"1.5.0" | ||
| put!($var, task) | ||
| else | ||
| push!($var, task) | ||
| end | ||
| end | ||
| schedule(task) | ||
| task | ||
| end | ||
| end | ||
| end | ||
|
|
||
| macro spawn_or_run(expr) | ||
| letargs = Base._lift_one_interp!(expr) | ||
|
|
||
| thunk = esc(:(()->($expr))) | ||
| var = esc(Base.sync_varname) | ||
| quote | ||
| let $(letargs...) | ||
| if DataFrames.ismultithreaded() | ||
| local task = Task($thunk) | ||
| task.sticky = false | ||
| if $(Expr(:islocal, var)) | ||
| @static if VERSION >= v"1.5.0" | ||
| put!($var, task) | ||
| else | ||
| push!($var, task) | ||
| end | ||
| end | ||
| schedule(task) | ||
| else | ||
| $thunk() | ||
| end | ||
| nothing | ||
| end | ||
| end | ||
| end | ||
| else | ||
| # This is the definition of @async in Base | ||
| macro spawn_or_async(expr) | ||
| thunk = esc(:(()->($expr))) | ||
| var = esc(Base.sync_varname) | ||
| quote | ||
| local task = Task($thunk) | ||
| if $(Expr(:isdefined, var)) | ||
| push!($var, task) | ||
| end | ||
| schedule(task) | ||
| end | ||
| end | ||
|
|
||
| macro spawn_or_run(expr) | ||
| esc(:($expr; nothing)) | ||
| end | ||
| end | ||
|
|
||
| function _nt_like_hash(v, h::UInt) | ||
| length(v) == 0 && return hash(NamedTuple(), h) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.