Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 19 additions & 40 deletions tools/server/server-models.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>
#include <queue>
#include <filesystem>
#include <cstring>

#ifdef _WIN32
#include <winsock2.h>
Expand All @@ -33,7 +34,8 @@
#include <limits.h>
#endif

#define CMD_EXIT "exit"
#define CMD_ROUTER_TO_CHILD_EXIT "cmd_router_to_child:exit"
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready"

// address for child process, this is needed because router may run on 0.0.0.0
// ref: https://github.com/ggml-org/llama.cpp/issues/17862
Expand Down Expand Up @@ -534,6 +536,8 @@ void server_models::load(const std::string & name) {
std::vector<char *> argv = to_char_ptr_array(child_args);
std::vector<char *> envp = to_char_ptr_array(child_env);

// TODO @ngxson : maybe separate stdout and stderr in the future
// so that we can use stdout for commands and stderr for logging
int options = subprocess_option_no_window | subprocess_option_combined_stdout_stderr;
int result = subprocess_create_ex(argv.data(), options, envp.data(), inst.subproc.get());
if (result != 0) {
Expand All @@ -547,11 +551,17 @@ void server_models::load(const std::string & name) {
// captured variables are guaranteed to be destroyed only after the thread is joined
inst.th = std::thread([this, name, child_proc = inst.subproc, port = inst.meta.port]() {
// read stdout/stderr and forward to main server log
bool state_received = false; // true if child state received
FILE * p_stdout_stderr = subprocess_stdout(child_proc.get());
if (p_stdout_stderr) {
char buffer[4096];
while (fgets(buffer, sizeof(buffer), p_stdout_stderr) != nullptr) {
LOG("[%5d] %s", port, buffer);
if (!state_received && std::strstr(buffer, CMD_CHILD_TO_ROUTER_READY) != nullptr) {
// child process is ready
this->update_status(name, SERVER_MODEL_STATUS_LOADED);
state_received = true;
}
}
} else {
SRV_ERR("failed to get stdout/stderr of child process for name=%s\n", name.c_str());
Expand Down Expand Up @@ -595,7 +605,7 @@ static void interrupt_subprocess(FILE * stdin_file) {
// because subprocess.h does not provide a way to send SIGINT,
// we will send a command to the child process to exit gracefully
if (stdin_file) {
fprintf(stdin_file, "%s\n", CMD_EXIT);
fprintf(stdin_file, "%s\n", CMD_ROUTER_TO_CHILD_EXIT);
fflush(stdin_file);
}
}
Expand Down Expand Up @@ -707,32 +717,13 @@ server_http_res_ptr server_models::proxy_request(const server_http_req & req, co
return proxy;
}

std::thread server_models::setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler) {
std::thread server_models::setup_child_server(const std::function<void(int)> & shutdown_handler) {
// send a notification to the router server that a model instance is ready
// TODO @ngxson : use HTTP client from libcommon
httplib::Client cli(base_params.hostname, router_port);
cli.set_connection_timeout(0, 200000); // 200 milliseconds

httplib::Request req;
req.method = "POST";
req.path = "/models/status";
req.set_header("Content-Type", "application/json");
if (!base_params.api_keys.empty()) {
req.set_header("Authorization", "Bearer " + base_params.api_keys[0]);
}

json body;
body["model"] = name;
body["value"] = server_model_status_to_string(SERVER_MODEL_STATUS_LOADED);
req.body = body.dump();

SRV_INF("notifying router server (port=%d) that model %s is ready\n", router_port, name.c_str());
auto result = cli.send(std::move(req));
if (result.error() != httplib::Error::Success) {
auto err_str = httplib::to_string(result.error());
SRV_ERR("failed to notify router server: %s\n", err_str.c_str());
exit(1); // force exit
}
common_log_pause(common_log_main());
fflush(stdout);
fprintf(stdout, "%s\n", CMD_CHILD_TO_ROUTER_READY);
fflush(stdout);
common_log_resume(common_log_main());

// setup thread for monitoring stdin
return std::thread([shutdown_handler]() {
Expand All @@ -746,7 +737,7 @@ std::thread server_models::setup_child_server(const common_params & base_params,
eof = true;
break;
}
if (line.find(CMD_EXIT) != std::string::npos) {
if (line.find(CMD_ROUTER_TO_CHILD_EXIT) != std::string::npos) {
SRV_INF("%s", "exit command received, exiting...\n");
shutdown_handler(0);
break;
Expand Down Expand Up @@ -869,18 +860,6 @@ void server_models_routes::init_routes() {
return res;
};

// used by child process to notify the router about status change
// TODO @ngxson : maybe implement authentication for this endpoint in the future
this->post_router_models_status = [this](const server_http_req & req) {
auto res = std::make_unique<server_http_res>();
json body = json::parse(req.body);
std::string model = json_value(body, "model", std::string());
std::string value = json_value(body, "value", std::string());
models.update_status(model, server_model_status_from_string(value));
res_ok(res, {{"success", true}});
return res;
};

this->get_router_models = [this](const server_http_req &) {
auto res = std::make_unique<server_http_res>();
json models_json = json::array();
Expand Down
3 changes: 1 addition & 2 deletions tools/server/server-models.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct server_models {

// notify the router server that a model instance is ready
// return the monitoring thread (to be joined by the caller)
static std::thread setup_child_server(const common_params & base_params, int router_port, const std::string & name, std::function<void(int)> & shutdown_handler);
static std::thread setup_child_server(const std::function<void(int)> & shutdown_handler);
};

struct server_models_routes {
Expand All @@ -162,7 +162,6 @@ struct server_models_routes {
server_http_context::handler_t proxy_post;
server_http_context::handler_t get_router_models;
server_http_context::handler_t post_router_models_load;
server_http_context::handler_t post_router_models_status;
server_http_context::handler_t post_router_models_unload;
};

Expand Down
3 changes: 1 addition & 2 deletions tools/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ int main(int argc, char ** argv, char ** envp) {
routes.get_models = models_routes->get_router_models;
ctx_http.post("/models/load", ex_wrapper(models_routes->post_router_models_load));
ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload));
ctx_http.post("/models/status", ex_wrapper(models_routes->post_router_models_status));
}

ctx_http.get ("/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check)
Expand Down Expand Up @@ -291,7 +290,7 @@ int main(int argc, char ** argv, char ** envp) {
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
std::thread monitor_thread;
if (router_port != nullptr) {
monitor_thread = server_models::setup_child_server(params, std::atoi(router_port), params.model_alias, shutdown_handler);
monitor_thread = server_models::setup_child_server(shutdown_handler);
}

// this call blocks the main thread until queue_tasks.terminate() is called
Expand Down
Loading