From b55964a11d31ec45396d1625a4726fb7839aec23 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 12 Jan 2026 17:41:37 +0100 Subject: [PATCH 01/11] server : make sure children tasks are scheduled to launch with parent --- tools/server/server-context.cpp | 140 ++++++++++++++++++++++++-------- tools/server/server-task.h | 25 +++++- 2 files changed, 129 insertions(+), 36 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index af6e0534243..6b9b1eb9a4c 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -84,6 +84,10 @@ struct server_slot { std::unique_ptr task; std::unique_ptr task_prev; // used for debugging + // if set, we only accept this task next (in other words, the task reserves this slot) + // used for scheduling n_children tasks after the parent + int task_id_next = -1; + // used to determine the slot that has been used the longest int64_t t_last_used = -1; @@ -176,6 +180,7 @@ struct server_slot { void reset() { SLT_DBG(*this, "%s", "\n"); + task_id_next = -1; n_prompt_tokens_cache = 0; last_nl_pos = 0; @@ -325,17 +330,6 @@ struct server_slot { return n_draft_max; } - // note: a slot can also be either a parent or a child - // TODO: move to server_task - bool is_parent() const { - return task->n_children > 0; - } - - // TODO: move to server_task - bool is_child() const { - return task->id_parent >= 0; - } - void release() { if (is_processing()) { GGML_ASSERT(task); @@ -348,7 +342,7 @@ struct server_slot { state = SLOT_STATE_IDLE; // do not keep context of the child slots - the parent's context is enough - if (is_child()) { + if (task->is_child()) { clear(false); } @@ -1223,12 +1217,11 @@ struct server_context_impl { slot.task = std::make_unique(std::move(task)); - slot.state = slot.is_child() + slot.state = slot.task->is_child() ? SLOT_STATE_WAIT_OTHER // wait for the parent to process prompt : SLOT_STATE_STARTED; - SLT_INF(slot, "processing task, is_child = %d\n", slot.is_child()); - + SLT_INF(slot, "processing task, is_child = %d\n", slot.task->is_child()); return true; } @@ -1623,9 +1616,7 @@ struct server_context_impl { // tokenize the input if it's set by CLI, return false on error bool tokenize_cli_input(server_task & task) { - if (task.cli_input == nullptr) { - return true; // nothing to do - } + GGML_ASSERT(task.cli_input != nullptr); try { auto & opt = oai_parser_opt; common_chat_templates_inputs inputs; @@ -1659,6 +1650,52 @@ struct server_context_impl { return true; } + bool try_reserve_n_slots(const size_t n_required, const int task_id) { + size_t n_reserved = 0; + for (auto & slot : slots) { + if (n_reserved >= n_required) { + break; + } else if (slot.task_id_next == task_id) { + // already reserved to this task + n_reserved++; + } else if (slot.task_id_next == -1) { + // not reserved by any other tasks + slot.task_id_next = task_id; + n_reserved++; + } + } + return n_reserved >= n_required; + } + + // launch multiple slots for parent + child tasks + bool launch_slots_with_child_tasks(server_slot & parent_slot, server_task && parent_task) { + GGML_ASSERT(parent_task.is_parent()); + + // launch all child tasks first + size_t i_child = 0; + for (auto & slot : slots) { + if (slot.id == parent_slot.id) { + continue; + } + server_task && child_task = std::move(parent_task.child_tasks[i_child]); + if (!launch_slot_with_task(slot, std::move(child_task))) { + SRV_ERR("failed to launch slot with child task, id_task = %d\n", child_task.id); + return false; + } + i_child++; + } + GGML_ASSERT(i_child == parent_task.n_children); + parent_task.child_tasks.clear(); + + // finally, launch the parent task + if (!launch_slot_with_task(parent_slot, std::move(parent_task))) { + SRV_ERR("failed to launch slot with task, id_task = %d\n", parent_task.id); + return false; + } + + return true; + } + void process_single_task(server_task && task) { switch (task.type) { case SERVER_TASK_TYPE_COMPLETION: @@ -1666,14 +1703,22 @@ struct server_context_impl { case SERVER_TASK_TYPE_EMBEDDING: case SERVER_TASK_TYPE_RERANK: { - if (!tokenize_cli_input(task)) { - break; + // special case: if input is provided via CLI, tokenize it first + // otherwise, no need to tokenize as it's already done inside the HTTP thread + if (task.cli_input != nullptr) { + if (!tokenize_cli_input(task)) { + break; + } } const int id_slot = task.id_slot; server_slot * slot = id_slot != -1 ? get_slot_by_id(id_slot) : get_available_slot(task); + // + // slot scheduling logic + // + if (slot == nullptr) { // if no slot is available, we defer this task for processing later SRV_DBG("no slot is available, defer task, id_task = %d\n", task.id); @@ -1681,6 +1726,13 @@ struct server_context_impl { break; } + if (slot->task_id_next != -1 && slot->task_id_next != task.id) { + // if the slot is reserved for another task, we defer this task for processing later + SRV_DBG("requested slot is reserved for another task (task_id_next = %d), defer task, id_task = %d\n", slot->task_id_next, task.id); + queue_tasks.defer(std::move(task)); + break; + } + if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); @@ -1688,6 +1740,29 @@ struct server_context_impl { break; } + if (task.is_parent()) { + // if this is a parent task, we want to make sure parent + all child tasks can be launched at the same time + // the current slot must be either reserved for this task, or free + GGML_ASSERT(slot->task_id_next == -1 || slot->task_id_next == task.id); + slot->task_id_next = task.id; + + // need to reserve n_children more slots + if (try_reserve_n_slots(task.n_children, task.id)) { + // all required slots have been reserved, safe to proceed + if (!launch_slots_with_child_tasks(*slot, std::move(task))) { + SRV_ERR("failed to launch slots with child tasks, id_task = %d\n", task.id); + } + break; + } else { + // failed to reserve all required slots, we defer this task for processing later + SRV_DBG("failed to reserve %d slots, defer task, id_task = %d\n", task.n_children + 1, task.id); + // clear task_id_next for the current slot + slot->task_id_next = -1; + queue_tasks.defer(std::move(task)); + break; + } + } + if (!launch_slot_with_task(*slot, std::move(task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", task.id); break; @@ -1695,11 +1770,14 @@ struct server_context_impl { } break; case SERVER_TASK_TYPE_CANCEL: { - // release slot linked with the task id for (auto & slot : slots) { + // release slot linked with the task id if (slot.task && slot.task->id == task.id_target) { slot.release(); - break; + } + // also clear task_id_next if needed + if (slot.task_id_next == task.id_target) { + slot.task_id_next = -1; } } } break; @@ -2968,7 +3046,9 @@ std::unique_ptr server_routes::handle_completions_impl( // Everything else, including multimodal completions. inputs = tokenize_input_prompts(ctx_server.vocab, ctx_server.mctx, prompt, true, true); } - tasks.reserve(inputs.size()); + + // tasks.reserve(inputs.size()); // TODO: this is inaccurate due to child tasks + for (size_t i = 0; i < inputs.size(); i++) { server_task task = server_task(type); @@ -2988,24 +3068,16 @@ std::unique_ptr server_routes::handle_completions_impl( task.params.oaicompat_model = meta->model_name; // prepare child tasks + std::vector child_tasks; if (task.params.n_cmpl > 1) { task.n_children = task.params.n_cmpl - 1; for (int j = 0; j < task.n_children; j++) { - server_task child = task.create_child(task.id, rd.get_new_id()); - - // use different sampling seed for each child - // note: https://github.com/ggml-org/llama.cpp/pull/18700#discussion_r2675115723 - if (child.params.sampling.seed != LLAMA_DEFAULT_SEED) { - child.params.sampling.seed += j + 1; - } - - tasks.push_back(std::move(child)); + task.add_child(task.id, rd.get_new_id()); } } - // note: the parent task always launches first - tasks.insert(tasks.begin(), std::move(task)); + tasks.push_back(std::move(task)); } rd.post_tasks(std::move(tasks)); diff --git a/tools/server/server-task.h b/tools/server/server-task.h index cf08fced631..cdfcf4c8ad1 100644 --- a/tools/server/server-task.h +++ b/tools/server/server-task.h @@ -123,6 +123,9 @@ struct server_task { // used by parallel sampling (multiple completions from same prompt) int n_children = 0; // number of tasks reusing this prompt int id_parent = -1; + // temporary store of child tasks for scheduling + // note: this vector is invalid after the task is moved to server_slot + std::vector child_tasks; // used by SERVER_TASK_TYPE_INFERENCE task_params params; @@ -167,11 +170,14 @@ struct server_task { std::unordered_set ids(tasks.size()); for (size_t i = 0; i < tasks.size(); i++) { ids.insert(tasks[i].id); + for (auto & child : tasks[i].child_tasks) { + ids.insert(child.id); + } } return ids; } - server_task create_child(int id_parent, int id_child) const { + void add_child(int id_parent, int id_child) { server_task copy; copy.id = id_child; @@ -179,8 +185,15 @@ struct server_task { copy.params = params; copy.type = type; copy.tokens = tokens.clone(); + copy.id_slot = -1; // child tasks cannot specify slot + + // use different sampling seed for each child + // note: https://github.com/ggml-org/llama.cpp/pull/18700#discussion_r2675115723 + if (copy.params.sampling.seed != LLAMA_DEFAULT_SEED) { + copy.params.sampling.seed += (uint32_t)child_tasks.size() + 1; + } - return copy; + child_tasks.push_back(std::move(copy)); } // the task will be moved into queue, then onto slots @@ -188,6 +201,14 @@ struct server_task { task_result_state create_state() const { return task_result_state(params.oaicompat_chat_syntax); } + + bool is_parent() const { + return n_children > 0; + } + + bool is_child() const { + return id_parent != -1; + } }; struct result_timings { From e32545f1fb82cf35883b7546df96a4c4d4e0190d Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 12 Jan 2026 18:13:16 +0100 Subject: [PATCH 02/11] fix --- tools/server/server-context.cpp | 76 ++++++++++--------- tools/server/server-queue.cpp | 24 ++++-- tools/server/server-queue.h | 2 +- .../server/tests/unit/test_chat_completion.py | 32 ++++---- 4 files changed, 76 insertions(+), 58 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 6b9b1eb9a4c..a9f6424970a 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1650,49 +1650,68 @@ struct server_context_impl { return true; } + // if N slots are reserved AND they are all available, return true + // if not, leave them in the reserved state and return false bool try_reserve_n_slots(const size_t n_required, const int task_id) { size_t n_reserved = 0; + size_t n_available = 0; for (auto & slot : slots) { if (n_reserved >= n_required) { break; - } else if (slot.task_id_next == task_id) { - // already reserved to this task - n_reserved++; - } else if (slot.task_id_next == -1) { - // not reserved by any other tasks + } else if (slot.task_id_next == task_id || slot.task_id_next == -1) { + // already reserved to this task OR not reserved by any other tasks slot.task_id_next = task_id; n_reserved++; + if (!slot.is_processing()) { + n_available++; + } + } + } + return n_available >= n_required; + } + + void unreserve_slots(const int task_id) { + for (auto & slot : slots) { + if (slot.task_id_next == task_id) { + slot.task_id_next = -1; } } - return n_reserved >= n_required; } // launch multiple slots for parent + child tasks bool launch_slots_with_child_tasks(server_slot & parent_slot, server_task && parent_task) { GGML_ASSERT(parent_task.is_parent()); + SRV_INF("launching slots for parent task id_task = %d with %d child tasks\n", + parent_task.id, parent_task.n_children); // launch all child tasks first - size_t i_child = 0; + int i_child = 0; for (auto & slot : slots) { if (slot.id == parent_slot.id) { continue; } - server_task && child_task = std::move(parent_task.child_tasks[i_child]); - if (!launch_slot_with_task(slot, std::move(child_task))) { - SRV_ERR("failed to launch slot with child task, id_task = %d\n", child_task.id); + if (i_child >= parent_task.n_children) { + break; + } + int id_child = parent_task.child_tasks[i_child].id; + if (!launch_slot_with_task(slot, std::move(parent_task.child_tasks[i_child]))) { + SRV_ERR("failed to launch slot with child task, id_task = %d\n", id_child); return false; } i_child++; } - GGML_ASSERT(i_child == parent_task.n_children); parent_task.child_tasks.clear(); // finally, launch the parent task + int id_parent = parent_task.id; if (!launch_slot_with_task(parent_slot, std::move(parent_task))) { - SRV_ERR("failed to launch slot with task, id_task = %d\n", parent_task.id); + SRV_ERR("failed to launch slot with task, id_task = %d\n", id_parent); return false; } + // all slots have been successfully launched, unreserve them + unreserve_slots(id_parent); + return true; } @@ -1749,14 +1768,16 @@ struct server_context_impl { // need to reserve n_children more slots if (try_reserve_n_slots(task.n_children, task.id)) { // all required slots have been reserved, safe to proceed + int task_id = task.id; if (!launch_slots_with_child_tasks(*slot, std::move(task))) { - SRV_ERR("failed to launch slots with child tasks, id_task = %d\n", task.id); + SRV_ERR("failed to launch slots with child tasks, id_task = %d\n", task_id); + unreserve_slots(task_id); // task is cancelled, unreserve all slots } break; } else { // failed to reserve all required slots, we defer this task for processing later SRV_DBG("failed to reserve %d slots, defer task, id_task = %d\n", task.n_children + 1, task.id); - // clear task_id_next for the current slot + // clear task_id_next for the current slot (while keeping other slots reserved) slot->task_id_next = -1; queue_tasks.defer(std::move(task)); break; @@ -1770,16 +1791,14 @@ struct server_context_impl { } break; case SERVER_TASK_TYPE_CANCEL: { + // release slot linked with the task id for (auto & slot : slots) { - // release slot linked with the task id if (slot.task && slot.task->id == task.id_target) { slot.release(); - } - // also clear task_id_next if needed - if (slot.task_id_next == task.id_target) { - slot.task_id_next = -1; + break; } } + unreserve_slots(task.id_target); } break; case SERVER_TASK_TYPE_NEXT_RESPONSE: { @@ -2037,7 +2056,7 @@ struct server_context_impl { GGML_ABORT("not supported by multimodal"); } - if (slot.is_parent() || slot.is_child()) { + if (slot.task->is_parent() || slot.task->is_child()) { send_error(slot, "context shift cannot be used for shared prompt", ERROR_TYPE_SERVER); slot.release(); continue; @@ -2184,21 +2203,6 @@ struct server_context_impl { // this slot still has a prompt to be processed if (slot.state == SLOT_STATE_PROCESSING_PROMPT || slot.state == SLOT_STATE_STARTED) { - // wait for all children to be launched - if (slot.is_parent()) { - int n_launched = 0; - for (auto & other : slots) { - if (other.is_processing() && other.is_child() && other.task->id_parent == slot.task->id) { - ++n_launched; - } - } - - if (n_launched < slot.task->n_children) { - SLT_DBG(slot, "waiting for children to be launched, n_children = %d, n_launched = %d\n", slot.task->n_children, n_launched); - continue; - } - } - const auto & input_tokens = slot.task->tokens; // TODO: maybe move branch to outside of this loop in the future @@ -2752,7 +2756,7 @@ struct server_context_impl { // handle `n_cmpl > 1` tasks - when the main prompt is processed, activate all child tasks too for (auto & slot : slots) { - if (slot.state == SLOT_STATE_DONE_PROMPT && slot.is_parent()) { + if (slot.state == SLOT_STATE_DONE_PROMPT && slot.task->is_parent()) { SLT_INF(slot, "parent task prompt done, n_children = %d\n", slot.task->n_children); std::vector children; diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index 9a6ba560a36..8ec6d042b6f 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -217,12 +217,12 @@ void server_response::add_waiting_task_id(int id_task) { waiting_task_ids.insert(id_task); } -void server_response::add_waiting_tasks(const std::vector & tasks) { +void server_response::add_waiting_task_ids(const std::unordered_set & id_tasks) { std::unique_lock lock(mutex_results); - for (const auto & task : tasks) { - RES_DBG("add task %d to waiting list. current waiting = %d (before add)\n", task.id, (int) waiting_task_ids.size()); - waiting_task_ids.insert(task.id); + for (const auto & id_task : id_tasks) { + RES_DBG("add task %d to waiting list. current waiting = %d (before add)\n", id_task, (int) waiting_task_ids.size()); + waiting_task_ids.insert(id_task); } } @@ -327,6 +327,7 @@ void server_response::terminate() { void server_response_reader::post_task(server_task && task, bool front) { GGML_ASSERT(id_tasks.empty() && "post_task() can only be called once per reader"); + GGML_ASSERT(!task.is_parent() && "not supported, use post_tasks() instead"); task.index = 0; id_tasks.insert(task.id); states.push_back(task.create_state()); @@ -338,11 +339,18 @@ void server_response_reader::post_tasks(std::vector && tasks, bool GGML_ASSERT(id_tasks.empty() && "post_tasks() can only be called once per reader"); id_tasks = server_task::get_list_id(tasks); states.reserve(tasks.size()); - for (size_t i = 0; i < tasks.size(); i++) { - tasks[i].index = i; - states.push_back(tasks[i].create_state()); + size_t index = 0; + for (auto & task : tasks) { + task.index = index++; + states.push_back(task.create_state()); + // for child tasks + for (auto & child_task : task.child_tasks) { + child_task.index = index++; + states.push_back(child_task.create_state()); + } } - queue_results.add_waiting_tasks(tasks); + GGML_ASSERT(states.size() == id_tasks.size()); + queue_results.add_waiting_task_ids(id_tasks); queue_tasks.post(std::move(tasks), front); } diff --git a/tools/server/server-queue.h b/tools/server/server-queue.h index 3798aa299ef..6a2960aba22 100644 --- a/tools/server/server-queue.h +++ b/tools/server/server-queue.h @@ -124,7 +124,7 @@ struct server_response { // add the id_task to the list of tasks waiting for response void add_waiting_task_id(int id_task); - void add_waiting_tasks(const std::vector & tasks); + void add_waiting_task_ids(const std::unordered_set & id_tasks); // when the request is finished, we can remove task associated with it void remove_waiting_task_id(int id_task); diff --git a/tools/server/tests/unit/test_chat_completion.py b/tools/server/tests/unit/test_chat_completion.py index d0ce01bc6ec..d56a930f7c1 100644 --- a/tools/server/tests/unit/test_chat_completion.py +++ b/tools/server/tests/unit/test_chat_completion.py @@ -491,16 +491,22 @@ def make_cmpl_request(): def test_chat_completions_multiple_choices(): global server server.start() - res = server.make_request("POST", "/chat/completions", data={ - "max_tokens": 8, - "n": 2, - "messages": [ - {"role": "system", "content": "Book"}, - {"role": "user", "content": "What is the best book"}, - ], - }) - assert res.status_code == 200 - assert len(res.body["choices"]) == 2 - for choice in res.body["choices"]: - assert "assistant" == choice["message"]["role"] - assert choice["finish_reason"] == "length" + # make sure cache can be reused across multiple choices and multiple requests + # ref: https://github.com/ggml-org/llama.cpp/pull/18663 + for _ in range(2): + res = server.make_request("POST", "/chat/completions", data={ + "max_tokens": 8, + "n": 2, + "messages": [ + {"role": "system", "content": "Book"}, + {"role": "user", "content": "What is the best book"}, + ], + # test forcing the same slot to be used + # the scheduler should not be locked up in this case + "id_slot": 0, + }) + assert res.status_code == 200 + assert len(res.body["choices"]) == 2 + for choice in res.body["choices"]: + assert "assistant" == choice["message"]["role"] + assert choice["finish_reason"] == "length" From 821e329e8ee236bb0fccb98f92cc2d5e28a07320 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 12 Jan 2026 18:21:24 +0100 Subject: [PATCH 03/11] add comment pointing to this PR --- tools/server/server-context.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index a9f6424970a..77a5506a70a 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1652,6 +1652,7 @@ struct server_context_impl { // if N slots are reserved AND they are all available, return true // if not, leave them in the reserved state and return false + // ref: https://github.com/ggml-org/llama.cpp/pull/18789 bool try_reserve_n_slots(const size_t n_required, const int task_id) { size_t n_reserved = 0; size_t n_available = 0; From 25702ba89e58e47298524eb1c4d96794f076a0e2 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 12 Jan 2026 18:37:54 +0100 Subject: [PATCH 04/11] fix --- tools/server/server-context.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 77a5506a70a..48c9f8163db 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1653,11 +1653,13 @@ struct server_context_impl { // if N slots are reserved AND they are all available, return true // if not, leave them in the reserved state and return false // ref: https://github.com/ggml-org/llama.cpp/pull/18789 - bool try_reserve_n_slots(const size_t n_required, const int task_id) { + bool try_reserve_child_slots(server_slot & cur_slot, const size_t n_required, const int task_id) { size_t n_reserved = 0; size_t n_available = 0; for (auto & slot : slots) { - if (n_reserved >= n_required) { + if (slot.id == cur_slot.id) { + continue; // skip the current slot + } else if (n_reserved >= n_required) { break; } else if (slot.task_id_next == task_id || slot.task_id_next == -1) { // already reserved to this task OR not reserved by any other tasks @@ -1689,11 +1691,12 @@ struct server_context_impl { int i_child = 0; for (auto & slot : slots) { if (slot.id == parent_slot.id) { - continue; + continue; // skip the parent slot } if (i_child >= parent_task.n_children) { break; } + GGML_ASSERT(!slot.is_processing()); int id_child = parent_task.child_tasks[i_child].id; if (!launch_slot_with_task(slot, std::move(parent_task.child_tasks[i_child]))) { SRV_ERR("failed to launch slot with child task, id_task = %d\n", id_child); @@ -1704,6 +1707,7 @@ struct server_context_impl { parent_task.child_tasks.clear(); // finally, launch the parent task + GGML_ASSERT(!parent_slot.is_processing()); int id_parent = parent_task.id; if (!launch_slot_with_task(parent_slot, std::move(parent_task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", id_parent); @@ -1767,7 +1771,7 @@ struct server_context_impl { slot->task_id_next = task.id; // need to reserve n_children more slots - if (try_reserve_n_slots(task.n_children, task.id)) { + if (try_reserve_child_slots(*slot, task.n_children, task.id)) { // all required slots have been reserved, safe to proceed int task_id = task.id; if (!launch_slots_with_child_tasks(*slot, std::move(task))) { @@ -3073,7 +3077,6 @@ std::unique_ptr server_routes::handle_completions_impl( task.params.oaicompat_model = meta->model_name; // prepare child tasks - std::vector child_tasks; if (task.params.n_cmpl > 1) { task.n_children = task.params.n_cmpl - 1; From 9481b9d20cd3f02c89dff00300138ab0fb2bf70f Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Mon, 12 Jan 2026 22:18:25 +0100 Subject: [PATCH 05/11] clean up --- tools/server/server-context.cpp | 36 ++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 48c9f8163db..314160074b5 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1673,9 +1673,10 @@ struct server_context_impl { return n_available >= n_required; } - void unreserve_slots(const int task_id) { + // unreserve all slots reserved for the given task id (must be parent id) + void unreserve_slots(const int id_parent) { for (auto & slot : slots) { - if (slot.task_id_next == task_id) { + if (slot.task_id_next == id_parent) { slot.task_id_next = -1; } } @@ -1684,8 +1685,21 @@ struct server_context_impl { // launch multiple slots for parent + child tasks bool launch_slots_with_child_tasks(server_slot & parent_slot, server_task && parent_task) { GGML_ASSERT(parent_task.is_parent()); - SRV_INF("launching slots for parent task id_task = %d with %d child tasks\n", - parent_task.id, parent_task.n_children); + int id_parent = parent_task.id; + + SRV_INF("launching slots for parent task id_task = %d with %d child tasks\n", id_parent, parent_task.n_children); + + // to be called in case of failure to release all launched slots + auto release_slots = [this, id_parent]() { + for (auto & slot : slots) { + if (slot.is_processing() && ( + slot.task->id == id_parent || + slot.task->id_parent == id_parent + )) { + slot.release(); + } + } + }; // launch all child tasks first int i_child = 0; @@ -1700,6 +1714,8 @@ struct server_context_impl { int id_child = parent_task.child_tasks[i_child].id; if (!launch_slot_with_task(slot, std::move(parent_task.child_tasks[i_child]))) { SRV_ERR("failed to launch slot with child task, id_task = %d\n", id_child); + release_slots(); + unreserve_slots(id_parent); return false; } i_child++; @@ -1708,15 +1724,15 @@ struct server_context_impl { // finally, launch the parent task GGML_ASSERT(!parent_slot.is_processing()); - int id_parent = parent_task.id; if (!launch_slot_with_task(parent_slot, std::move(parent_task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", id_parent); + release_slots(); + unreserve_slots(id_parent); return false; } // all slots have been successfully launched, unreserve them unreserve_slots(id_parent); - return true; } @@ -1766,7 +1782,8 @@ struct server_context_impl { if (task.is_parent()) { // if this is a parent task, we want to make sure parent + all child tasks can be launched at the same time - // the current slot must be either reserved for this task, or free + + // the current slot must be either reserved for this task, or free (checked above) GGML_ASSERT(slot->task_id_next == -1 || slot->task_id_next == task.id); slot->task_id_next = task.id; @@ -1776,15 +1793,14 @@ struct server_context_impl { int task_id = task.id; if (!launch_slots_with_child_tasks(*slot, std::move(task))) { SRV_ERR("failed to launch slots with child tasks, id_task = %d\n", task_id); - unreserve_slots(task_id); // task is cancelled, unreserve all slots + break; } break; } else { // failed to reserve all required slots, we defer this task for processing later SRV_DBG("failed to reserve %d slots, defer task, id_task = %d\n", task.n_children + 1, task.id); - // clear task_id_next for the current slot (while keeping other slots reserved) - slot->task_id_next = -1; queue_tasks.defer(std::move(task)); + // note: the current slot + child slots are already reserved for this task break; } } From f0349e404f64198017bb6a57390ca83e12abf0da Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 13 Jan 2026 14:22:30 +0100 Subject: [PATCH 06/11] more debug messages --- tools/server/server-context.cpp | 36 ++++++++++++++++++++------------- tools/server/server-queue.cpp | 1 + 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 314160074b5..270b2954e8e 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -941,10 +941,16 @@ struct server_context_impl { return true; } - server_slot * get_slot_by_id(int id) { + server_slot * get_slot_by_id(int id_slot, int id_task) { for (server_slot & slot : slots) { - if (slot.id == id) { - return &slot; + if (slot.id == id_slot) { + if (slot.task_id_next == -1 || slot.task_id_next == id_task) { + return &slot; + } else { + SRV_DBG("slot %d is reserved for task %d, cannot assign task %d\n", + id_slot, slot.task_id_next, id_task); + return nullptr; + } } } @@ -966,6 +972,11 @@ struct server_context_impl { continue; } + // skip slot reserved for other tasks + if (slot.task_id_next != -1 && slot.task_id_next != task.id) { + continue; + } + const auto & tokens = slot.prompt.tokens; // skip the slot if it does not contains cached tokens @@ -1663,6 +1674,7 @@ struct server_context_impl { break; } else if (slot.task_id_next == task_id || slot.task_id_next == -1) { // already reserved to this task OR not reserved by any other tasks + SRV_DBG("reserving slot %d for task %d\n", slot.id, task_id); slot.task_id_next = task_id; n_reserved++; if (!slot.is_processing()) { @@ -1753,7 +1765,9 @@ struct server_context_impl { const int id_slot = task.id_slot; - server_slot * slot = id_slot != -1 ? get_slot_by_id(id_slot) : get_available_slot(task); + server_slot * slot = id_slot != -1 + ? get_slot_by_id(id_slot, task.id) + : get_available_slot(task); // // slot scheduling logic @@ -1766,13 +1780,6 @@ struct server_context_impl { break; } - if (slot->task_id_next != -1 && slot->task_id_next != task.id) { - // if the slot is reserved for another task, we defer this task for processing later - SRV_DBG("requested slot is reserved for another task (task_id_next = %d), defer task, id_task = %d\n", slot->task_id_next, task.id); - queue_tasks.defer(std::move(task)); - break; - } - if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); @@ -1793,6 +1800,7 @@ struct server_context_impl { int task_id = task.id; if (!launch_slots_with_child_tasks(*slot, std::move(task))) { SRV_ERR("failed to launch slots with child tasks, id_task = %d\n", task_id); + // task must be dropped on error break; } break; @@ -1880,7 +1888,7 @@ struct server_context_impl { } int id_slot = task.slot_action.slot_id; - server_slot * slot = get_slot_by_id(id_slot); + server_slot * slot = get_slot_by_id(id_slot, task.id); if (slot == nullptr) { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; @@ -1918,7 +1926,7 @@ struct server_context_impl { { if (!check_no_mtmd(task.id)) break; int id_slot = task.slot_action.slot_id; - server_slot * slot = get_slot_by_id(id_slot); + server_slot * slot = get_slot_by_id(id_slot, task.id); if (slot == nullptr) { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; @@ -1967,7 +1975,7 @@ struct server_context_impl { break; } int id_slot = task.slot_action.slot_id; - server_slot * slot = get_slot_by_id(id_slot); + server_slot * slot = get_slot_by_id(id_slot, task.id); if (slot == nullptr) { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index 8ec6d042b6f..9367aab9ad3 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -77,6 +77,7 @@ int server_queue::get_new_id() { void server_queue::pop_deferred_task() { std::unique_lock lock(mutex_tasks); if (!queue_tasks_deferred.empty()) { + QUE_DBG("pop deferred task, id = %d\n", queue_tasks_deferred.front().id); queue_tasks.emplace_front(std::move(queue_tasks_deferred.front())); queue_tasks_deferred.pop_front(); } From da6e2bac70e70deaaaebd42bc25c13d66923defb Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 13 Jan 2026 14:28:11 +0100 Subject: [PATCH 07/11] add pop_deferred_task with specific ID version --- tools/server/server-context.cpp | 9 +++++---- tools/server/server-queue.cpp | 19 +++++++++++++++---- tools/server/server-queue.h | 4 +++- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 270b2954e8e..6ad8371d52a 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -171,7 +171,7 @@ struct server_slot { double t_prompt_processing; // ms double t_token_generation; // ms - std::function callback_on_release; + std::function callback_on_release; // Speculative decoding stats int32_t n_draft_total = 0; // Total draft tokens generated @@ -349,7 +349,7 @@ struct server_slot { task_prev = std::move(task); task.reset(); - callback_on_release(id); + callback_on_release(id, task_id_next); } } @@ -826,8 +826,9 @@ struct server_context_impl { SLT_INF(slot, "new slot, n_ctx = %d\n", slot.n_ctx); - slot.callback_on_release = [this](int) { - queue_tasks.pop_deferred_task(); + slot.callback_on_release = [this](int slot_id, int task_id_next) { + GGML_UNUSED(slot_id); + queue_tasks.pop_deferred_task(task_id_next); }; slot.reset(); diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index 9367aab9ad3..d161077c739 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -74,12 +74,23 @@ int server_queue::get_new_id() { return new_id; } -void server_queue::pop_deferred_task() { +void server_queue::pop_deferred_task(int id_task) { std::unique_lock lock(mutex_tasks); if (!queue_tasks_deferred.empty()) { - QUE_DBG("pop deferred task, id = %d\n", queue_tasks_deferred.front().id); - queue_tasks.emplace_front(std::move(queue_tasks_deferred.front())); - queue_tasks_deferred.pop_front(); + if (id_task == -1) { + QUE_DBG("pop deferred task, id = %d\n", queue_tasks_deferred.front().id); + queue_tasks.emplace_front(std::move(queue_tasks_deferred.front())); + queue_tasks_deferred.pop_front(); + } else { + for (auto it = queue_tasks_deferred.begin(); it != queue_tasks_deferred.end(); ++it) { + if (it->id == id_task) { + QUE_DBG("pop deferred task (specific id), id = %d\n", it->id); + queue_tasks.emplace_front(std::move(*it)); + queue_tasks_deferred.erase(it); + break; + } + } + } } time_last_task = ggml_time_ms(); condition_tasks.notify_one(); diff --git a/tools/server/server-queue.h b/tools/server/server-queue.h index 6a2960aba22..34607fde3a6 100644 --- a/tools/server/server-queue.h +++ b/tools/server/server-queue.h @@ -44,7 +44,9 @@ struct server_queue { int get_new_id(); // Call when the state of one slot is changed, it will move one task from deferred to main queue - void pop_deferred_task(); + // if id_task != -1, only pop the deferred task with matching id + // pop nothing if no matching id is found + void pop_deferred_task(int id_task); // if sleeping, request exiting sleep state and wait until it is done // returns immediately if not sleeping From d6b0d2396037ebdf163bc3272c00870152836186 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 13 Jan 2026 14:56:18 +0100 Subject: [PATCH 08/11] improve the logic --- tools/server/server-context.cpp | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 6ad8371d52a..7cf00836186 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -942,16 +942,10 @@ struct server_context_impl { return true; } - server_slot * get_slot_by_id(int id_slot, int id_task) { + server_slot * get_slot_by_id(int id_slot) { for (server_slot & slot : slots) { if (slot.id == id_slot) { - if (slot.task_id_next == -1 || slot.task_id_next == id_task) { - return &slot; - } else { - SRV_DBG("slot %d is reserved for task %d, cannot assign task %d\n", - id_slot, slot.task_id_next, id_task); - return nullptr; - } + return &slot; } } @@ -1703,6 +1697,7 @@ struct server_context_impl { SRV_INF("launching slots for parent task id_task = %d with %d child tasks\n", id_parent, parent_task.n_children); // to be called in case of failure to release all launched slots + // this unreserves any remaning reserved slots for this parent task (prevent dangling reservations) auto release_slots = [this, id_parent]() { for (auto & slot : slots) { if (slot.is_processing() && ( @@ -1715,6 +1710,7 @@ struct server_context_impl { }; // launch all child tasks first + // note: launch_slot_with_task will unreserve the task int i_child = 0; for (auto & slot : slots) { if (slot.id == parent_slot.id) { @@ -1744,8 +1740,6 @@ struct server_context_impl { return false; } - // all slots have been successfully launched, unreserve them - unreserve_slots(id_parent); return true; } @@ -1767,7 +1761,7 @@ struct server_context_impl { const int id_slot = task.id_slot; server_slot * slot = id_slot != -1 - ? get_slot_by_id(id_slot, task.id) + ? get_slot_by_id(id_slot) : get_available_slot(task); // @@ -1781,6 +1775,13 @@ struct server_context_impl { break; } + if (slot->task_id_next != -1 && slot->task_id_next != task.id) { + // slot is reserved for another task, we defer this task for processing later + SRV_DBG("slot %d is reserved for task %d, defer task, id_task = %d\n", slot->id, slot->task_id_next, task.id); + queue_tasks.defer(std::move(task)); + break; + } + if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); @@ -1814,6 +1815,7 @@ struct server_context_impl { } } + // note: launch_slot_with_task will unreserve the task if (!launch_slot_with_task(*slot, std::move(task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", task.id); break; @@ -1889,7 +1891,7 @@ struct server_context_impl { } int id_slot = task.slot_action.slot_id; - server_slot * slot = get_slot_by_id(id_slot, task.id); + server_slot * slot = get_slot_by_id(id_slot); if (slot == nullptr) { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; @@ -1927,7 +1929,7 @@ struct server_context_impl { { if (!check_no_mtmd(task.id)) break; int id_slot = task.slot_action.slot_id; - server_slot * slot = get_slot_by_id(id_slot, task.id); + server_slot * slot = get_slot_by_id(id_slot); if (slot == nullptr) { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; @@ -1976,7 +1978,7 @@ struct server_context_impl { break; } int id_slot = task.slot_action.slot_id; - server_slot * slot = get_slot_by_id(id_slot, task.id); + server_slot * slot = get_slot_by_id(id_slot); if (slot == nullptr) { send_error(task, "Invalid slot ID", ERROR_TYPE_INVALID_REQUEST); break; From ba86ad966f62e9d14d102becc331442d85288fb6 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 14 Jan 2026 19:01:29 +0100 Subject: [PATCH 09/11] simple approach --- tools/server/server-context.cpp | 174 ++++++++++---------------------- tools/server/server-queue.cpp | 27 ++--- tools/server/server-queue.h | 5 +- tools/server/server-task.h | 5 +- 4 files changed, 74 insertions(+), 137 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 7cf00836186..2f9eee39d17 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -84,10 +84,6 @@ struct server_slot { std::unique_ptr task; std::unique_ptr task_prev; // used for debugging - // if set, we only accept this task next (in other words, the task reserves this slot) - // used for scheduling n_children tasks after the parent - int task_id_next = -1; - // used to determine the slot that has been used the longest int64_t t_last_used = -1; @@ -171,7 +167,7 @@ struct server_slot { double t_prompt_processing; // ms double t_token_generation; // ms - std::function callback_on_release; + std::function callback_on_release; // Speculative decoding stats int32_t n_draft_total = 0; // Total draft tokens generated @@ -180,7 +176,6 @@ struct server_slot { void reset() { SLT_DBG(*this, "%s", "\n"); - task_id_next = -1; n_prompt_tokens_cache = 0; last_nl_pos = 0; @@ -349,7 +344,7 @@ struct server_slot { task_prev = std::move(task); task.reset(); - callback_on_release(id, task_id_next); + callback_on_release(id); } } @@ -826,9 +821,8 @@ struct server_context_impl { SLT_INF(slot, "new slot, n_ctx = %d\n", slot.n_ctx); - slot.callback_on_release = [this](int slot_id, int task_id_next) { - GGML_UNUSED(slot_id); - queue_tasks.pop_deferred_task(task_id_next); + slot.callback_on_release = [this](int slot_id) { + queue_tasks.pop_deferred_task(slot_id); }; slot.reset(); @@ -967,11 +961,6 @@ struct server_context_impl { continue; } - // skip slot reserved for other tasks - if (slot.task_id_next != -1 && slot.task_id_next != task.id) { - continue; - } - const auto & tokens = slot.prompt.tokens; // skip the slot if it does not contains cached tokens @@ -1656,48 +1645,35 @@ struct server_context_impl { return true; } - // if N slots are reserved AND they are all available, return true - // if not, leave them in the reserved state and return false - // ref: https://github.com/ggml-org/llama.cpp/pull/18789 - bool try_reserve_child_slots(server_slot & cur_slot, const size_t n_required, const int task_id) { - size_t n_reserved = 0; - size_t n_available = 0; + // launch multiple slots for parent + child tasks + // return: + // - 0: success + // - 1: not enough slots can be allocated for all tasks + // - 2: launching any of the slots fails + int launch_slots_with_parent_task(server_slot & parent_slot, server_task && parent_task) { + GGML_ASSERT(!parent_slot.is_processing()); + GGML_ASSERT(parent_task.is_parent()); + + int id_parent = parent_task.id; + + std::vector child_slots; for (auto & slot : slots) { - if (slot.id == cur_slot.id) { - continue; // skip the current slot - } else if (n_reserved >= n_required) { + if (!slot.is_processing() && slot.id != parent_slot.id) { + child_slots.push_back(&slot); + } + if (child_slots.size() >= parent_task.child_tasks.size()) { break; - } else if (slot.task_id_next == task_id || slot.task_id_next == -1) { - // already reserved to this task OR not reserved by any other tasks - SRV_DBG("reserving slot %d for task %d\n", slot.id, task_id); - slot.task_id_next = task_id; - n_reserved++; - if (!slot.is_processing()) { - n_available++; - } } } - return n_available >= n_required; - } - - // unreserve all slots reserved for the given task id (must be parent id) - void unreserve_slots(const int id_parent) { - for (auto & slot : slots) { - if (slot.task_id_next == id_parent) { - slot.task_id_next = -1; - } + if (child_slots.size() < parent_task.child_tasks.size()) { + SRV_DBG("not enough free slots to launch child tasks, n_free_slots = %zu, n_children = %zu\n", + child_slots.size(), parent_task.child_tasks.size()); + return 1; } - } - - // launch multiple slots for parent + child tasks - bool launch_slots_with_child_tasks(server_slot & parent_slot, server_task && parent_task) { - GGML_ASSERT(parent_task.is_parent()); - int id_parent = parent_task.id; - SRV_INF("launching slots for parent task id_task = %d with %d child tasks\n", id_parent, parent_task.n_children); + SRV_INF("launching slots for parent task id_task = %d with %zu child tasks\n", id_parent, parent_task.child_tasks.size()); // to be called in case of failure to release all launched slots - // this unreserves any remaning reserved slots for this parent task (prevent dangling reservations) auto release_slots = [this, id_parent]() { for (auto & slot : slots) { if (slot.is_processing() && ( @@ -1710,37 +1686,26 @@ struct server_context_impl { }; // launch all child tasks first - // note: launch_slot_with_task will unreserve the task - int i_child = 0; - for (auto & slot : slots) { - if (slot.id == parent_slot.id) { - continue; // skip the parent slot - } - if (i_child >= parent_task.n_children) { - break; - } - GGML_ASSERT(!slot.is_processing()); - int id_child = parent_task.child_tasks[i_child].id; - if (!launch_slot_with_task(slot, std::move(parent_task.child_tasks[i_child]))) { + size_t idx = 0; + GGML_ASSERT(child_slots.size() == parent_task.child_tasks.size()); + for (auto * slot : child_slots) { + int id_child = parent_task.child_tasks[idx].id; + if (!launch_slot_with_task(*slot, std::move(parent_task.child_tasks[idx]))) { SRV_ERR("failed to launch slot with child task, id_task = %d\n", id_child); release_slots(); - unreserve_slots(id_parent); - return false; + return 2; } - i_child++; + idx++; } - parent_task.child_tasks.clear(); // finally, launch the parent task - GGML_ASSERT(!parent_slot.is_processing()); if (!launch_slot_with_task(parent_slot, std::move(parent_task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", id_parent); release_slots(); - unreserve_slots(id_parent); - return false; + return 2; } - return true; + return 0; } void process_single_task(server_task && task) { @@ -1759,6 +1724,7 @@ struct server_context_impl { } const int id_slot = task.id_slot; + const int id_task = task.id; server_slot * slot = id_slot != -1 ? get_slot_by_id(id_slot) @@ -1770,55 +1736,31 @@ struct server_context_impl { if (slot == nullptr) { // if no slot is available, we defer this task for processing later - SRV_DBG("no slot is available, defer task, id_task = %d\n", task.id); - queue_tasks.defer(std::move(task)); - break; - } - - if (slot->task_id_next != -1 && slot->task_id_next != task.id) { - // slot is reserved for another task, we defer this task for processing later - SRV_DBG("slot %d is reserved for task %d, defer task, id_task = %d\n", slot->id, slot->task_id_next, task.id); + SRV_DBG("no slot is available, defer task, id_task = %d\n", id_task); queue_tasks.defer(std::move(task)); break; } if (slot->is_processing()) { // if requested slot is unavailable, we defer this task for processing later - SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id); + SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", id_task); queue_tasks.defer(std::move(task)); break; } if (task.is_parent()) { - // if this is a parent task, we want to make sure parent + all child tasks can be launched at the same time - - // the current slot must be either reserved for this task, or free (checked above) - GGML_ASSERT(slot->task_id_next == -1 || slot->task_id_next == task.id); - slot->task_id_next = task.id; - - // need to reserve n_children more slots - if (try_reserve_child_slots(*slot, task.n_children, task.id)) { - // all required slots have been reserved, safe to proceed - int task_id = task.id; - if (!launch_slots_with_child_tasks(*slot, std::move(task))) { - SRV_ERR("failed to launch slots with child tasks, id_task = %d\n", task_id); - // task must be dropped on error - break; - } - break; - } else { - // failed to reserve all required slots, we defer this task for processing later - SRV_DBG("failed to reserve %d slots, defer task, id_task = %d\n", task.n_children + 1, task.id); + int res = launch_slots_with_parent_task(*slot, std::move(task)); + if (res == 2) { + SRV_ERR("failed to launch slots with parent task, id_task = %d\n", id_task); + break; // drop the task + } else if (res == 1) { + SRV_DBG("not enough slots, defer task, id_task = %d\n", id_task); queue_tasks.defer(std::move(task)); - // note: the current slot + child slots are already reserved for this task break; } - } - - // note: launch_slot_with_task will unreserve the task - if (!launch_slot_with_task(*slot, std::move(task))) { - SRV_ERR("failed to launch slot with task, id_task = %d\n", task.id); - break; + } else if (!launch_slot_with_task(*slot, std::move(task))) { + SRV_ERR("failed to launch slot with task, id_task = %d\n", id_task); + break; // drop the task } } break; case SERVER_TASK_TYPE_CANCEL: @@ -1830,7 +1772,6 @@ struct server_context_impl { break; } } - unreserve_slots(task.id_target); } break; case SERVER_TASK_TYPE_NEXT_RESPONSE: { @@ -2789,8 +2730,6 @@ struct server_context_impl { // handle `n_cmpl > 1` tasks - when the main prompt is processed, activate all child tasks too for (auto & slot : slots) { if (slot.state == SLOT_STATE_DONE_PROMPT && slot.task->is_parent()) { - SLT_INF(slot, "parent task prompt done, n_children = %d\n", slot.task->n_children); - std::vector children; for (auto & other : slots) { if (other.state == SLOT_STATE_WAIT_OTHER && slot.task->id == other.task->id_parent) { @@ -2798,17 +2737,15 @@ struct server_context_impl { } } - // we can only proceed if all child slots are having the correct tasks - if (slot.task->n_children == (int) children.size()) { - // copy state to the child slots - for (auto & child : children) { - SLT_INF(slot, " - copying state to child %d\n", child->id); + // all children slots should already launched by launch_slots_with_parent_task() + // copy state to the child slots + for (auto & child : children) { + SLT_INF(slot, " - copying state to child %d\n", child->id); - GGML_ASSERT(child->state == SLOT_STATE_WAIT_OTHER); + GGML_ASSERT(child->state == SLOT_STATE_WAIT_OTHER); - slot.copy_state_to(*child); - child->state = SLOT_STATE_DONE_PROMPT; - } + slot.copy_state_to(*child); + child->state = SLOT_STATE_DONE_PROMPT; } } } @@ -3105,9 +3042,8 @@ std::unique_ptr server_routes::handle_completions_impl( // prepare child tasks if (task.params.n_cmpl > 1) { - task.n_children = task.params.n_cmpl - 1; - - for (int j = 0; j < task.n_children; j++) { + int n_children = task.params.n_cmpl - 1; + for (int j = 0; j < n_children; j++) { task.add_child(task.id, rd.get_new_id()); } } diff --git a/tools/server/server-queue.cpp b/tools/server/server-queue.cpp index d161077c739..a2a026a12ce 100644 --- a/tools/server/server-queue.cpp +++ b/tools/server/server-queue.cpp @@ -74,22 +74,25 @@ int server_queue::get_new_id() { return new_id; } -void server_queue::pop_deferred_task(int id_task) { +void server_queue::pop_deferred_task(int id_slot) { std::unique_lock lock(mutex_tasks); if (!queue_tasks_deferred.empty()) { - if (id_task == -1) { - QUE_DBG("pop deferred task, id = %d\n", queue_tasks_deferred.front().id); + // try to find a task that uses the specified slot + bool found = false; + for (auto it = queue_tasks_deferred.begin(); it != queue_tasks_deferred.end(); ++it) { + if (it->id_slot == id_slot) { + QUE_DBG("pop deferred task (use slot %d), id_task = %d\n", id_slot, it->id); + queue_tasks.emplace_front(std::move(*it)); + queue_tasks_deferred.erase(it); + found = true; + break; + } + } + // if not tasks found using the slot, just pop the first deferred task (default behavior) + if (!found) { + QUE_DBG("pop deferred task, id_task = %d\n", queue_tasks_deferred.front().id); queue_tasks.emplace_front(std::move(queue_tasks_deferred.front())); queue_tasks_deferred.pop_front(); - } else { - for (auto it = queue_tasks_deferred.begin(); it != queue_tasks_deferred.end(); ++it) { - if (it->id == id_task) { - QUE_DBG("pop deferred task (specific id), id = %d\n", it->id); - queue_tasks.emplace_front(std::move(*it)); - queue_tasks_deferred.erase(it); - break; - } - } } } time_last_task = ggml_time_ms(); diff --git a/tools/server/server-queue.h b/tools/server/server-queue.h index 34607fde3a6..164f09b195c 100644 --- a/tools/server/server-queue.h +++ b/tools/server/server-queue.h @@ -44,9 +44,8 @@ struct server_queue { int get_new_id(); // Call when the state of one slot is changed, it will move one task from deferred to main queue - // if id_task != -1, only pop the deferred task with matching id - // pop nothing if no matching id is found - void pop_deferred_task(int id_task); + // prioritize tasks that use the specified slot (otherwise, pop the first deferred task) + void pop_deferred_task(int id_slot); // if sleeping, request exiting sleep state and wait until it is done // returns immediately if not sleeping diff --git a/tools/server/server-task.h b/tools/server/server-task.h index cdfcf4c8ad1..1774cd54a99 100644 --- a/tools/server/server-task.h +++ b/tools/server/server-task.h @@ -121,10 +121,9 @@ struct server_task { int id_slot = -1; // used by parallel sampling (multiple completions from same prompt) - int n_children = 0; // number of tasks reusing this prompt int id_parent = -1; // temporary store of child tasks for scheduling - // note: this vector is invalid after the task is moved to server_slot + // note: accessing to elements is invalid after the task is moved to server_slot std::vector child_tasks; // used by SERVER_TASK_TYPE_INFERENCE @@ -203,7 +202,7 @@ struct server_task { } bool is_parent() const { - return n_children > 0; + return child_tasks.size() > 0; } bool is_child() const { From 79c1967e45880b89a56caee6112da495692d87ee Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 14 Jan 2026 23:07:03 +0100 Subject: [PATCH 10/11] no double move --- tools/server/server-context.cpp | 50 ++++++++++++++++----------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 2f9eee39d17..1a6e481b860 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1645,31 +1645,26 @@ struct server_context_impl { return true; } - // launch multiple slots for parent + child tasks - // return: - // - 0: success - // - 1: not enough slots can be allocated for all tasks - // - 2: launching any of the slots fails - int launch_slots_with_parent_task(server_slot & parent_slot, server_task && parent_task) { - GGML_ASSERT(!parent_slot.is_processing()); - GGML_ASSERT(parent_task.is_parent()); - - int id_parent = parent_task.id; - - std::vector child_slots; + std::vector get_free_slots(size_t n_slots_needed, int exclude_id_slot) { + std::vector free_slots; for (auto & slot : slots) { - if (!slot.is_processing() && slot.id != parent_slot.id) { - child_slots.push_back(&slot); + if (!slot.is_processing() && slot.id != exclude_id_slot) { + free_slots.push_back(&slot); } - if (child_slots.size() >= parent_task.child_tasks.size()) { + if (free_slots.size() >= n_slots_needed) { break; } } - if (child_slots.size() < parent_task.child_tasks.size()) { - SRV_DBG("not enough free slots to launch child tasks, n_free_slots = %zu, n_children = %zu\n", - child_slots.size(), parent_task.child_tasks.size()); - return 1; - } + return free_slots; + } + + // launch multiple slots for parent + child tasks + bool launch_slots_with_parent_task(server_slot & parent_slot, std::vector & child_slots, server_task && parent_task) { + GGML_ASSERT(!parent_slot.is_processing()); + GGML_ASSERT(parent_task.is_parent()); + GGML_ASSERT(child_slots.size() == parent_task.child_tasks.size()); + + int id_parent = parent_task.id; SRV_INF("launching slots for parent task id_task = %d with %zu child tasks\n", id_parent, parent_task.child_tasks.size()); @@ -1749,15 +1744,18 @@ struct server_context_impl { } if (task.is_parent()) { - int res = launch_slots_with_parent_task(*slot, std::move(task)); - if (res == 2) { - SRV_ERR("failed to launch slots with parent task, id_task = %d\n", id_task); - break; // drop the task - } else if (res == 1) { - SRV_DBG("not enough slots, defer task, id_task = %d\n", id_task); + // try getting free slots for all child tasks + size_t n_child_tasks = task.child_tasks.size(); + std::vector child_slots = get_free_slots(n_child_tasks, slot->id); + if (child_slots.size() < n_child_tasks) { + SRV_DBG("not enough free slots for child tasks, n_free = %zu, n_children = %zu, defer task, id_task = %d\n", child_slots.size(), n_child_tasks, id_task); queue_tasks.defer(std::move(task)); break; } + if (!launch_slots_with_parent_task(*slot, child_slots, std::move(task))) { + SRV_ERR("failed to launch slot with parent task, id_task = %d\n", id_task); + break; // drop the task + } } else if (!launch_slot_with_task(*slot, std::move(task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", id_task); break; // drop the task From 8b5474a02df1fc789939c43610338ce7f6560dd7 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Thu, 15 Jan 2026 11:59:04 +0100 Subject: [PATCH 11/11] correct return type of launch_slots_with_parent_task --- tools/server/server-context.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/server/server-context.cpp b/tools/server/server-context.cpp index 1a6e481b860..f255327ba29 100644 --- a/tools/server/server-context.cpp +++ b/tools/server/server-context.cpp @@ -1688,7 +1688,7 @@ struct server_context_impl { if (!launch_slot_with_task(*slot, std::move(parent_task.child_tasks[idx]))) { SRV_ERR("failed to launch slot with child task, id_task = %d\n", id_child); release_slots(); - return 2; + return false; } idx++; } @@ -1697,10 +1697,10 @@ struct server_context_impl { if (!launch_slot_with_task(parent_slot, std::move(parent_task))) { SRV_ERR("failed to launch slot with task, id_task = %d\n", id_parent); release_slots(); - return 2; + return false; } - return 0; + return true; } void process_single_task(server_task && task) {