diff --git a/include/ylt/coro_rpc/impl/coro_connection.hpp b/include/ylt/coro_rpc/impl/coro_connection.hpp index c98e423e1..f095c1cad 100644 --- a/include/ylt/coro_rpc/impl/coro_connection.hpp +++ b/include/ylt/coro_rpc/impl/coro_connection.hpp @@ -514,8 +514,8 @@ class coro_connection : public std::enable_shared_from_this { } if (ret.first) AS_UNLIKELY { - ELOG_ERROR << ret.first.message() << ", " - << "async_write error"; + ELOG_INFO << ret.first.message() << ", " + << "async_write error"; close(); co_return; } diff --git a/include/ylt/standalone/cinatra/coro_http_connection.hpp b/include/ylt/standalone/cinatra/coro_http_connection.hpp index 35e0feb56..ef42022b7 100644 --- a/include/ylt/standalone/cinatra/coro_http_connection.hpp +++ b/include/ylt/standalone/cinatra/coro_http_connection.hpp @@ -433,7 +433,7 @@ class coro_http_connection } if (ec) { - CINATRA_LOG_ERROR << "async_write error: " << ec.message(); + CINATRA_LOG_INFO << "async_write error: " << ec.message(); close(); co_return false; } @@ -779,6 +779,8 @@ class coro_http_connection auto &tcp_socket() { return *socket_wrapper_.socket(); } + uint64_t conn_id() const { return conn_id_; } + void set_quit_callback(std::function callback, uint64_t conn_id) { quit_cb_ = std::move(callback); diff --git a/include/ylt/standalone/cinatra/picohttpparser.h b/include/ylt/standalone/cinatra/picohttpparser.h index 479a8c231..18b2dbb9f 100644 --- a/include/ylt/standalone/cinatra/picohttpparser.h +++ b/include/ylt/standalone/cinatra/picohttpparser.h @@ -854,6 +854,12 @@ static const char *parse_headers(const char *buf, const char *buf_end, // has_close has_close = true; } + else if (ch == 'k' || ch == 'K') { + std::string_view val_str{value, value_len}; + if (val_str.find("pgrade") != std::string_view::npos) { + has_upgrade = true; + } + } } } headers[*num_headers] = {std::string_view{name, name_len}, diff --git a/src/coro_http/examples/chat_room.cpp b/src/coro_http/examples/chat_room.cpp index afda21235..11dcaf3a6 100644 --- a/src/coro_http/examples/chat_room.cpp +++ b/src/coro_http/examples/chat_room.cpp @@ -37,17 +37,29 @@ struct message_t { }; YLT_REFL(message_t, type, content); -async_simple::coro::Lazy broadcast(auto &conn_map, - std::string &resp_str) { - for (auto &[conn_ptr, user_name] : conn_map) { - auto conn = (coro_http_connection *)conn_ptr; +struct conn_holder { + std::weak_ptr conn; + std::string user_name; +}; + +async_simple::coro::Lazy> broadcast( + auto &conn_map, std::string &resp_str) { + std::vector dead_conn_ids; + for (auto &[id, holder] : conn_map) { + auto conn = holder.conn.lock(); + if (conn == nullptr) { + dead_conn_ids.push_back(id); + continue; + } auto ec = co_await conn->write_websocket(resp_str); if (ec) { std::cout << ec.message() << "\n"; continue; } } + resp_str.clear(); + co_return dead_conn_ids; } int main() { @@ -55,21 +67,21 @@ int main() { coro_http::coro_http_server server(1, 9001); server.set_static_res_dir("", ""); std::mutex mtx; - std::unordered_map conn_map; + std::unordered_map conn_map; server.set_http_handler( "/", [&](coro_http_request &req, coro_http_response &resp) -> async_simple::coro::Lazy { websocket_result result{}; - std::unordered_map map; + std::unordered_map map; std::string resp_str; while (true) { result = co_await req.get_conn()->read_websocket(); if (result.ec) { { std::scoped_lock lock(mtx); - conn_map.erase((intptr_t)req.get_conn()); + conn_map.erase(req.get_conn()->conn_id()); } break; } @@ -87,7 +99,7 @@ int main() { std::string from; { std::scoped_lock lock(mtx); - from = conn_map.at((intptr_t)req.get_conn()); + from = conn_map.at(req.get_conn()->conn_id()).user_name; map = conn_map; } @@ -101,11 +113,13 @@ int main() { std::scoped_lock lock(mtx); if (msg.type == "login") { user_name = std::string{msg.content}; - conn_map.emplace((intptr_t)req.get_conn(), user_name); + conn_map.emplace( + req.get_conn()->conn_id(), + conn_holder{req.get_conn()->shared_from_this(), user_name}); } else { - user_name = conn_map.at((intptr_t)req.get_conn()); - conn_map.erase((intptr_t)req.get_conn()); + user_name = conn_map.at(req.get_conn()->conn_id()).user_name; + conn_map.erase(req.get_conn()->conn_id()); } map = conn_map; } @@ -114,7 +128,7 @@ int main() { std::vector user_list; std::transform(map.begin(), map.end(), std::back_inserter(user_list), [](auto &kv) { - return kv.second; + return kv.second.user_name; }); logout_info_t info{msg.type, user_name, std::move(user_list)}; struct_json::to_json(info, resp_str); @@ -123,7 +137,15 @@ int main() { std::cout << result.data << "\n"; - co_await broadcast(map, resp_str); + auto ids = co_await broadcast(map, resp_str); + if (!ids.empty()) { + // clean dead connection + std::scoped_lock lock(mtx); + for (auto id : ids) { + conn_map.erase(id); + } + } + if (msg.type == "logout") { break; } diff --git a/src/coro_http/examples/client.html b/src/coro_http/examples/client.html index d0736ca77..8d2b32ef9 100644 --- a/src/coro_http/examples/client.html +++ b/src/coro_http/examples/client.html @@ -3,6 +3,7 @@ +