diff --git a/base/lock.jl b/base/lock.jl index 9057a39294229..afcc2c8d5d075 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -84,6 +84,7 @@ end #@assert rl.reentrancy_cnt === 0 rl.reentrancy_cnt = 0x0000_0001 @atomic :release rl.locked_by = ct + # TODO: returning without enable_finalizers() ?? return true end GC.enable_finalizers() @@ -109,6 +110,8 @@ Each `lock` must be matched by an [`unlock`](@ref). # it was unlocked, so try to lock it ourself _trylock(rl, current_task()) && break else # it was locked, so now wait for the release to notify us + # TODO: verify it is impossible for havelock == 0x00 before + # waiting here. wait(c) end end @@ -135,16 +138,13 @@ internal counter and return immediately. rl.reentrancy_cnt = n if n == 0x0000_00000 @atomic :monotonic rl.locked_by = nothing - if (@atomicswap :release rl.havelock = 0x00) == 0x02 - (@noinline function notifywaiters(rl) - cond_wait = rl.cond_wait - lock(cond_wait) - try - notify(cond_wait) - finally - unlock(cond_wait) - end - end)(rl) + @atomic :release rl.havelock = 0x00 + cond_wait = rl.cond_wait + lock(cond_wait) + try + notify(cond_wait, all=false) + finally + unlock(cond_wait) end return true end diff --git a/base/partr.jl b/base/partr.jl index 0393308802281..9b55ca9d0f0ae 100644 --- a/base/partr.jl +++ b/base/partr.jl @@ -94,12 +94,16 @@ end function multiq_insert(task::Task, priority::UInt16) + ccall(:jl_tv_multiq_p_inc, Cvoid, ()) + + # tpid = task pool id tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task) heap_p = multiq_size(tpid) tp = tpid + 1 task.priority = priority + # TODO: task pushed to a randomly chosen thread rn = cong(heap_p, cong_unbias[tp]) tpheaps = heaps[tp] while !trylock(tpheaps[rn].lock) @@ -174,6 +178,7 @@ function multiq_deletemin() prio1 = heap.tasks[1].priority end @atomic :monotonic heap.priority = prio1 + ccall(:jl_tv_multiq_m_inc, Cvoid, ()) unlock(heap.lock) return task diff --git a/base/task.jl b/base/task.jl index d7b144150301c..73e861a1454a1 100644 --- a/base/task.jl +++ b/base/task.jl @@ -635,6 +635,7 @@ function task_done_hook(t::Task) # Clear sigatomic before waiting sigatomic_end() try + ccall(:jl_tv_tasks_waiting_m_inc, Cvoid, ()) ## Kludge- wait() # this will not return catch e # If an InterruptException happens while blocked in the event loop, try handing @@ -801,6 +802,7 @@ function schedule(t::Task, @nospecialize(arg); error=false) t.queue === nothing || Base.error("schedule: Task not runnable") setfield!(t, :result, arg) end + # TODO: how do we ensure that the same task is not enqueued multiple times? enq_work(t) return t end @@ -834,6 +836,10 @@ immediately yields to `t` before calling the scheduler. function yield(t::Task, @nospecialize(x=nothing)) (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable") t.result = x + # ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) + # if hash(time()) % 1000 == 0 + # print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n") + # end enq_work(current_task()) set_next_task(t) return try_yieldto(ensure_rescheduled) @@ -855,6 +861,10 @@ function yieldto(t::Task, @nospecialize(x=nothing)) elseif t._state === task_state_failed throw(t.result) end + # ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) + # if hash(time()) % 1000 == 0 + # print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n") + # end t.result = x set_next_task(t) return try_yieldto(identity) @@ -881,6 +891,10 @@ end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) + # ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) + # if hash(time()) % 1000 == 0 + # print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n") + # end t.result = exc t._isexception = true set_next_task(t) @@ -933,12 +947,20 @@ checktaskempty = Partr.multiq_check_empty end function wait() + ccall(:jl_tv_tasks_waiting_p_inc, Cvoid, ()) + # if hash(time()) % 1000 == 0 + # print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n") + # end GC.safepoint() W = workqueue_for(Threads.threadid()) poptask(W) result = try_yieldto(ensure_rescheduled) + # TODO: how does this call to process_events() interact with locks / conditions? + # First thing a task does after waking is to process events? + # Will there be contention on libuv lock? process_events() # return when we come out of the queue + ccall(:jl_tv_tasks_waiting_m_inc, Cvoid, ()) return result end diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index b4c5c22c5cf8e..8812846e1d114 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -324,6 +324,7 @@ macro spawn(args...) let $(letargs...) local task = Task($thunk) task.sticky = false + # TODO: return value from jl_set_task_threadpoolid not checked ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid) if $(Expr(:islocal, var)) put!($var, task) diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 081e9e42a694d..3f543c9140deb 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -475,6 +475,11 @@ XX(jl_try_substrtof) \ XX(jl_tty_set_mode) \ XX(jl_tupletype_fill) \ + XX(jl_tv_multiq_p_inc) \ + XX(jl_tv_multiq_m_inc) \ + XX(jl_tv_tasks_waiting_p_inc) \ + XX(jl_tv_tasks_waiting_m_inc) \ + XX(jl_tv_getmetric) \ XX(jl_typeassert) \ XX(jl_typeinf_begin) \ XX(jl_typeinf_end) \ diff --git a/src/julia_internal.h b/src/julia_internal.h index 814d66751c86f..5877b7d1c2e81 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -18,6 +18,15 @@ #define sleep(x) Sleep(1000*x) #endif +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_m; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_m; + #ifdef __cplusplus extern "C" { #endif diff --git a/src/partr.c b/src/partr.c index c0c2e8907db92..34e96a70d24f1 100644 --- a/src/partr.c +++ b/src/partr.c @@ -30,6 +30,7 @@ static const int16_t sleeping = 1; // invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping. // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. // information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue. + // ^^^ ??? TODO // information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons. // information: These observations require sequentially-consistent fences to be inserted between each of those operational phases. // [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where @@ -187,6 +188,9 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) *start_cycles = jl_hrtime(); return 0; } + // TODO: jl_hrtime() is a wall clock timestamp. This OS thread is not guaranteed to + // run continuously- there might be a context switch, and this thread could resume + // well after sleep_threshold has elapsed? uint64_t elapsed_cycles = jl_hrtime() - (*start_cycles); if (elapsed_cycles >= sleep_threshold) { *start_cycles = 0; @@ -195,12 +199,15 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) return 0; } - +// this doesn't guarantee that on return the thread is waking or awake. +// there is a race condition here where the other thread goes to sleep just +// after this thread checks its state and sees !(jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) static int wake_thread(int16_t tid) { jl_ptls_t other = jl_all_tls_states[tid]; int8_t state = sleeping; + // TODO: use of condition variable here doesn't adhere to required discipline? if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) { if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) { JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state); @@ -255,6 +262,8 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) } // check if the other threads might be sleeping if (tid == -1) { + // TODO: every thread woken up when something added to multi-queue?? + // something added to the multi-queue: notify all threads // in the future, we might want to instead wake some fraction of threads, // and let each of those wake additional threads if they find work @@ -281,7 +290,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) jl_task_t *task = (jl_task_t*)jl_apply_generic(trypoptask, &q, 1); if (jl_typeis(task, jl_task_type)) { int self = jl_atomic_load_relaxed(&jl_current_task->tid); - jl_set_task_tid(task, self); + jl_set_task_tid(task, self); // TODO: return value not checked return task; } return NULL; @@ -302,6 +311,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping; } +// TODO: what is _threadedregion? extern _Atomic(unsigned) _threadedregion; JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty) @@ -420,7 +430,9 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, int8_t gc_state = jl_gc_safe_enter(ptls); uv_mutex_lock(&sleep_locks[ptls->tid]); while (may_sleep(ptls)) { + jl_atomic_fetch_add_relaxed(&jl_tv_threads_waiting_p, 1); uv_cond_wait(&wake_signals[ptls->tid], &sleep_locks[ptls->tid]); + jl_atomic_fetch_add_relaxed(&jl_tv_threads_waiting_m, 1); // TODO: help with gc work here, if applicable } assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); diff --git a/src/partr.pdf b/src/partr.pdf new file mode 100644 index 0000000000000..5476ccd3c6c0e Binary files /dev/null and b/src/partr.pdf differ diff --git a/src/signals-unix.c b/src/signals-unix.c index a9a7dec745543..629b38161a983 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -876,6 +876,12 @@ static void *signal_listener(void *arg) // notify thread to resume jl_thread_resume(i, sig); + + if (!critical) + { + // Kludge: only sample a single thread, to get an unbiased sample + break; + } } jl_unlock_profile(); } diff --git a/src/task.c b/src/task.c index a0577132eca8c..fbcb42f3c469c 100644 --- a/src/task.c +++ b/src/task.c @@ -228,6 +228,7 @@ static _Atomic(jl_function_t*) task_done_hook_func JL_GLOBALLY_ROOTED = NULL; void JL_NORETURN jl_finish_task(jl_task_t *t) { + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_m, 1); jl_task_t *ct = jl_current_task; JL_PROBE_RT_FINISH_TASK(ct); JL_SIGATOMIC_BEGIN(); @@ -518,6 +519,7 @@ static void ctx_switch(jl_task_t *lastt) JL_DLLEXPORT void jl_switch(void) { + //jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1); jl_task_t *ct = jl_current_task; jl_ptls_t ptls = ct->ptls; jl_task_t *t = ptls->next_task; @@ -820,6 +822,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion #ifdef _COMPILER_TSAN_ENABLED_ t->ctx.tsan_state = __tsan_create_fiber(0); #endif + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_p, 1); return t; } @@ -842,6 +845,7 @@ JL_DLLEXPORT jl_value_t *jl_get_root_task(void) return (jl_value_t*)ct->ptls->root_task; } +// TODO: this function has no callers? JL_DLLEXPORT void jl_task_wait() { static jl_function_t *wait_func = NULL; @@ -914,6 +918,7 @@ CFI_NORETURN jl_atomic_store_release(&pt->tid, -1); #endif + //jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1); ct->started = 1; JL_PROBE_RT_START_TASK(ct); if (jl_atomic_load_relaxed(&ct->_isexception)) { @@ -939,6 +944,7 @@ CFI_NORETURN skip_pop_exception:; } ct->result = res; + //jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_m, 1); jl_gc_wb(ct, ct->result); jl_finish_task(ct); jl_gc_debug_critical_error(); diff --git a/src/threading.c b/src/threading.c index 4464406d21a76..b54d6934ba673 100644 --- a/src/threading.c +++ b/src/threading.c @@ -291,6 +291,59 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_m = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_m = 0; + +JL_DLLEXPORT void jl_tv_multiq_p_inc(void) +{ jl_atomic_fetch_add_relaxed(&jl_tv_multiq_p, 1); } + +JL_DLLEXPORT void jl_tv_multiq_m_inc(void) +{ jl_atomic_fetch_add_relaxed(&jl_tv_multiq_m, 1); } + +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_dbg_counter = 0; + +JL_DLLEXPORT void jl_tv_tasks_waiting_p_inc(void) +{ + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_waiting_p, 1); + if (jl_atomic_fetch_add_relaxed(&jl_tv_dbg_counter, 1) % 521 == 257) + { + JL_TRY { + jl_error(""); // get a backtrace + } + JL_CATCH { + jl_printf((JL_STREAM*)STDERR_FILENO, "\n\nSample of task waiting:\n"); + jlbacktrace(); // written to STDERR_FILENO + } + } +} + +JL_DLLEXPORT void jl_tv_tasks_waiting_m_inc(void) +{ + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_waiting_m, 1); +} + +JL_DLLEXPORT int jl_tv_getmetric(int i) +{ + switch(i) + { + case 1: return jl_atomic_load_relaxed(&jl_tv_threads_waiting_p); + case 2: return jl_atomic_load_relaxed(&jl_tv_threads_waiting_m); + case 3: return jl_atomic_load_relaxed(&jl_tv_tasks_p); + case 4: return jl_atomic_load_relaxed(&jl_tv_tasks_m); + case 5: return jl_atomic_load_relaxed(&jl_tv_multiq_p); + case 6: return jl_atomic_load_relaxed(&jl_tv_multiq_m); + case 7: return jl_atomic_load_relaxed(&jl_tv_tasks_waiting_p); + case 8: return jl_atomic_load_relaxed(&jl_tv_tasks_waiting_m); + default: return 0; + } +} + // return calling thread's ID JL_DLLEXPORT int16_t jl_threadid(void) {