Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions base/initdefs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ const atexit_hooks = Callable[
() -> Filesystem.temp_cleanup_purge(force=true)
]
const _atexit_hooks_lock = ReentrantLock()
global _atexit_hooks_finished::Bool = false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vtjnash: Oh, wait, this needs to be volatile, right? Since it's used inside a while loop across multiple threads?

So this should be an atomic, since we don't have the ability to do volatile without atomic, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile is only meaningful for special memory mapped hardware registers, and x86 doesn't have any of those

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? Isn't it also needed to prevent global variables from getting pulled into either a register or a stack-local variable and thus breaking coordination across threads?

I think we've seen this in julia code before, that unless you mark the variable as an Atomic it ends up inlined as a local into the function body.

Here's an example:

julia> mutable struct Foo
           x::Int
       end

julia> function f(z::Foo)
                  while z.x > 0
                  end
                  return z.x
       end
f (generic function with 1 method)

julia> @code_native f(Foo(0))
        .section        __TEXT,__text,regular,pure_instructions
        .build_version macos, 13, 0
        .globl  _julia_f_331                    ; -- Begin function julia_f_331
        .p2align        2
_julia_f_331:                           ; @julia_f_331
; ┌ @ REPL[11]:1 within `f`
        .cfi_startproc
; %bb.0:                                ; %top
; │ @ REPL[11] within `f`
        ;DEBUG_VALUE: f:z <- [DW_OP_deref] $x0
        ;DEBUG_VALUE: f:z <- [DW_OP_deref] 0
        ldr     x0, [x0]
; │ @ REPL[11]:2 within `f`
        cmp     x0, #1                          ; =1
        b.lt    LBB0_2
LBB0_1:                                 ; %L1
                                        ; =>This Inner Loop Header: Depth=1
        b       LBB0_1
LBB0_2:                                 ; %L5
; │ @ REPL[11]:4 within `f`
        ret
        .cfi_endproc
; └
                                        ; -- End function
.subsections_via_symbols

And you can see that this breaks coordination across tasks:

julia> Threads.nthreads()
4

julia> global t::Foo = Foo(1)
Foo(1)

julia> Threads.@spawn @info f(t)
Task (runnable) @0x000000015b4bd5a0

julia> t.x = -1
-1

julia>

julia> t.x
-1

Whereas by marking this variable as an Atomic, it fixes the inlining of the variable, and its value is fetched on every iteration of the loop:

julia> mutable struct Foo2
           @atomic x::Int
       end

julia> function f(z::Foo2)
                  while @atomic(z.x) > 0
                  end
                  return @atomic z.x
       end
f (generic function with 2 methods)

julia> @code_native f(Foo2(0))
        .section        __TEXT,__text,regular,pure_instructions
        .build_version macos, 13, 0
        .globl  _julia_f_350                    ; -- Begin function julia_f_350
        .p2align        2
_julia_f_350:                           ; @julia_f_350
; ┌ @ REPL[30]:1 within `f`
        .cfi_startproc
; %bb.0:                                ; %top
; │ @ REPL[30] within `f`
        ;DEBUG_VALUE: f:z <- [DW_OP_deref] $x0
        ;DEBUG_VALUE: f:z <- [DW_OP_deref] 0
LBB0_1:                                 ; %L1
                                        ; =>This Inner Loop Header: Depth=1
; │ @ REPL[30]:2 within `f`
; │┌ @ Base.jl:50 within `getproperty`
        ldar    x8, [x0]
; │└
        cmp     x8, #0                          ; =0
        b.gt    LBB0_1
; %bb.2:                                ; %L5
; │ @ REPL[30]:4 within `f`
; │┌ @ Base.jl:50 within `getproperty`
        ldar    x0, [x0]
; │└
        ret
        .cfi_endproc
; └
                                        ; -- End function
.subsections_via_symbols

julia>

And now this issue is fixed as well:

julia> Threads.nthreads()
4

julia> global t2::Foo2 = Foo2(1)
Foo2(1)

julia> Threads.@spawn @info f(t2)
Task (runnable) @0x000000017234a290

julia> @atomic t2.x = -1
[ Info: -1
-1

^^ So I do think there is an issue in the code as written. I am pretty sure that this issue is part of what is encompassed by the term "volatility", but using whatever name we want to use for it, it's an issue, yeah?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although in this situation, it's also fixed by using a lock, which I guess introduces the needed barriers... 🤔

julia> mutable struct Foo
           x::Int
           lock::ReentrantLock
       end

julia> function f(z::Foo)
                  while @lock z.lock (z.x > 0)
                  end
                  return @lock z.lock z.x
       end
f (generic function with 1 method)

julia> global t::Foo = Foo(1, ReentrantLock())
Foo(1, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0
)), (2, 4333405504, 4499986432)))

julia> Threads.@spawn @info f(t)
Task (runnable) @0x0000000281dbf0f0

julia> @lock t.lock t.x = -1
[ Info: -1
-1

So are you saying that the lock like that should be sufficient in all cases, and we don't also need to mark the variable atomic, to get it to become volatile?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what Jameson meant is that the notion of "volatile" is irrelevant here. The notion that is of import is "atomicity" (https://en.cppreference.com/w/c/language/atomic).
Either by marking the Bool Atomic or by protecting it with a lock. Before C11 volatile and atomic were often confounded.

From https://en.cppreference.com/w/c/language/volatile

Note that volatile variables are not suitable for communication between threads; they do not offer atomicity, synchronization, or memory ordering. A read from a volatile variable that is modified by another thread without synchronization or concurrent modification from two unsynchronized threads is undefined behavior due to a data race.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha. Thank you both! That's clear now, then. 👍 Thanks for educating me!

So, a julia ReentrantLock is enough to also provide the LLVM / C concept of atomicity? We can guarantee that the updates to the value will be reflected across threads?

Now that we're discussing it, the answer seems to obviously be yes, otherwise how would anything work... OKAY thanks this is helpful.


"""
atexit(f)
Expand All @@ -374,12 +375,40 @@ exit code `n` (instead of the original exit code). If more than one exit hook
calls `exit(n)`, then Julia will exit with the exit code corresponding to the
last called exit hook that calls `exit(n)`. (Because exit hooks are called in
LIFO order, "last called" is equivalent to "first registered".)

Note: Once all exit hooks have been called, no more exit hooks can be registered,
and any call to `atexit(f)` after all hooks have completed will throw an exception.
This situation may occur if you are registering exit hooks from background Tasks that
may still be executing concurrently during shutdown.
"""
atexit(f::Function) = Base.@lock _atexit_hooks_lock (pushfirst!(atexit_hooks, f); nothing)
function atexit(f::Function)
Base.@lock _atexit_hooks_lock begin
_atexit_hooks_finished && error("cannot register new atexit hook; already exiting.")
pushfirst!(atexit_hooks, f)
return nothing
end
end

function _atexit(exitcode::Cint)
while !isempty(atexit_hooks)
f = popfirst!(atexit_hooks)
# Don't hold the lock around the iteration, just in case any other thread executing in
# parallel tries to register a new atexit hook while this is running. We don't want to
# block that thread from proceeding, and we can allow it to register its hook which we
# will immediately run here.
while true
local f
Base.@lock _atexit_hooks_lock begin
# If this is the last iteration, atomically disable atexit hooks to prevent
# someone from registering a hook that will never be run.
# (We do this inside the loop, so that it is atomic: no one can have registered
# a hook that never gets run, and we run all the hooks we know about until
# the vector is empty.)
if isempty(atexit_hooks)
global _atexit_hooks_finished = true
break
end

f = popfirst!(atexit_hooks)
end
try
if hasmethod(f, (Cint,))
f(exitcode)
Expand Down
99 changes: 97 additions & 2 deletions test/atexit.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ using Test

@testset "atexit.jl" begin
function _atexit_tests_gen_cmd_eval(expr::String)
# We run the atexit tests with 2 threads, for the parallelism tests at the end.
cmd_eval = ```
$(Base.julia_cmd()) -e $(expr)
$(Base.julia_cmd()) -t2 -e $(expr)
```
return cmd_eval
end
function _atexit_tests_gen_cmd_script(temp_dir::String, expr::String)
script, io = mktemp(temp_dir)
println(io, expr)
close(io)
# We run the atexit tests with 2 threads, for the parallelism tests at the end.
cmd_script = ```
$(Base.julia_cmd()) $(script)
$(Base.julia_cmd()) -t2 $(script)
```
return cmd_script
end
Expand Down Expand Up @@ -172,5 +174,98 @@ using Test
@test p_script.exitcode == expected_exit_code
end
end
@testset "test calling atexit() in parallel with running atexit hooks." begin
# These tests cover 3 parallelism cases, as described by the following comments.
julia_expr_list = Dict(
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# 1. registering a hook from inside a hook
"""
atexit() do
atexit() do
exit(11)
end
end
# This will attempt to exit 0, but the execution of the atexit hook will
# register another hook, which will exit 11.
exit(0)
""" => 11,
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# 2. registering a hook from another thread while hooks are running
"""
c = Channel()
# This hook must execute _last_. (Execution is LIFO.)
atexit() do
put!(c, nothing)
put!(c, nothing)
end
atexit() do
# This will run in a concurrent task, testing that we can register atexit
# hooks from another task while running atexit hooks.
Threads.@spawn begin
Core.println("INSIDE")
take!(c) # block on c
Core.println("go")
atexit() do
Core.println("exit11")
exit(11)
end
take!(c) # keep the _atexit() loop alive until we've added another item.
Core.println("done")
end
end
exit(0)
""" => 11,
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# 3. attempting to register a hook after all hooks have finished (disallowed)
"""
const atexit_has_finished = Threads.Atomic{Bool}(false)
atexit() do
Threads.@spawn begin
# Block until the atexit hooks have all finished. We use a manual "spin
# lock" because task switch is disallowed inside the finalizer, below.
while !atexit_has_finished[] end
Core.println("done")
try
# By the time this runs, all the atexit hooks will be done.
# So this will throw.
atexit() do
exit(11)
end
catch
# Meaning we _actually_ exit 22.
exit(22)
end
end
end
# Finalizers run after the atexit hooks, so this blocks exit until the spawned
# task above gets a chance to run.
x = []
finalizer(x) do x
Core.println("FINALIZER")
# Allow the spawned task to finish
atexit_has_finished[] = true
Core.println("ready")
# Then spin forever to prevent exit.
while atexit_has_finished[] end
Core.println("exiting")
end
exit(0)
""" => 22,
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
)
for julia_expr in keys(julia_expr_list)
cmd_eval = _atexit_tests_gen_cmd_eval(julia_expr)
cmd_script = _atexit_tests_gen_cmd_script(atexit_temp_dir, julia_expr)
expected_exit_code = julia_expr_list[julia_expr]
@test_throws(ProcessFailedException, run(cmd_eval))
@test_throws(ProcessFailedException, run(cmd_script))
p_eval = run(cmd_eval; wait = false)
p_script = run(cmd_script; wait = false)
wait(p_eval)
wait(p_script)
@test p_eval.exitcode == expected_exit_code
@test p_script.exitcode == expected_exit_code
end
end
rm(atexit_temp_dir; force = true, recursive = true)
end