Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/ylt/coro_rpc/impl/coro_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,8 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
}
if (ret.first)
AS_UNLIKELY {
ELOG_ERROR << ret.first.message() << ", "
<< "async_write error";
ELOG_INFO << ret.first.message() << ", "
<< "async_write error";
close();
co_return;
}
Expand Down
4 changes: 3 additions & 1 deletion include/ylt/standalone/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class coro_http_connection
}

if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
CINATRA_LOG_INFO << "async_write error: " << ec.message();
close();
co_return false;
}
Expand Down Expand Up @@ -779,6 +779,8 @@ class coro_http_connection

auto &tcp_socket() { return *socket_wrapper_.socket(); }

uint64_t conn_id() const { return conn_id_; }

void set_quit_callback(std::function<void(const uint64_t &conn_id)> callback,
uint64_t conn_id) {
quit_cb_ = std::move(callback);
Expand Down
6 changes: 6 additions & 0 deletions include/ylt/standalone/cinatra/picohttpparser.h
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,12 @@ static const char *parse_headers(const char *buf, const char *buf_end,
// has_close
has_close = true;
}
else if (ch == 'k' || ch == 'K') {
std::string_view val_str{value, value_len};
if (val_str.find("pgrade") != std::string_view::npos) {
has_upgrade = true;
}
}
}
}
headers[*num_headers] = {std::string_view{name, name_len},
Expand Down
48 changes: 35 additions & 13 deletions src/coro_http/examples/chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,51 @@ struct message_t {
};
YLT_REFL(message_t, type, content);

async_simple::coro::Lazy<void> broadcast(auto &conn_map,
std::string &resp_str) {
for (auto &[conn_ptr, user_name] : conn_map) {
auto conn = (coro_http_connection *)conn_ptr;
struct conn_holder {
std::weak_ptr<coro_http_connection> conn;
std::string user_name;
};

async_simple::coro::Lazy<std::vector<uint64_t>> broadcast(
auto &conn_map, std::string &resp_str) {
std::vector<uint64_t> dead_conn_ids;
for (auto &[id, holder] : conn_map) {
auto conn = holder.conn.lock();
if (conn == nullptr) {
dead_conn_ids.push_back(id);
continue;
}
auto ec = co_await conn->write_websocket(resp_str);
if (ec) {
std::cout << ec.message() << "\n";
continue;
}
}

resp_str.clear();
co_return dead_conn_ids;
}

int main() {
std::cout << fs::current_path() << "\n";
coro_http::coro_http_server server(1, 9001);
server.set_static_res_dir("", "");
std::mutex mtx;
std::unordered_map<intptr_t, std::string> conn_map;
std::unordered_map<uint64_t, conn_holder> conn_map;
server.set_http_handler<cinatra::GET>(
"/",
[&](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
websocket_result result{};

std::unordered_map<intptr_t, std::string> map;
std::unordered_map<uint64_t, conn_holder> map;
std::string resp_str;
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
{
std::scoped_lock lock(mtx);
conn_map.erase((intptr_t)req.get_conn());
conn_map.erase(req.get_conn()->conn_id());
}
break;
}
Expand All @@ -87,7 +99,7 @@ int main() {
std::string from;
{
std::scoped_lock lock(mtx);
from = conn_map.at((intptr_t)req.get_conn());
from = conn_map.at(req.get_conn()->conn_id()).user_name;
map = conn_map;
}

Expand All @@ -101,11 +113,13 @@ int main() {
std::scoped_lock lock(mtx);
if (msg.type == "login") {
user_name = std::string{msg.content};
conn_map.emplace((intptr_t)req.get_conn(), user_name);
conn_map.emplace(
req.get_conn()->conn_id(),
conn_holder{req.get_conn()->shared_from_this(), user_name});
}
else {
user_name = conn_map.at((intptr_t)req.get_conn());
conn_map.erase((intptr_t)req.get_conn());
user_name = conn_map.at(req.get_conn()->conn_id()).user_name;
conn_map.erase(req.get_conn()->conn_id());
}
map = conn_map;
}
Expand All @@ -114,7 +128,7 @@ int main() {
std::vector<std::string> user_list;
std::transform(map.begin(), map.end(),
std::back_inserter(user_list), [](auto &kv) {
return kv.second;
return kv.second.user_name;
});
logout_info_t info{msg.type, user_name, std::move(user_list)};
struct_json::to_json(info, resp_str);
Expand All @@ -123,7 +137,15 @@ int main() {

std::cout << result.data << "\n";

co_await broadcast(map, resp_str);
auto ids = co_await broadcast(map, resp_str);
if (!ids.empty()) {
// clean dead connection
std::scoped_lock lock(mtx);
for (auto id : ids) {
conn_map.erase(id);
}
}

if (msg.type == "logout") {
break;
}
Expand Down
1 change: 1 addition & 0 deletions src/coro_http/examples/client.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

<head>
<title></title>
<link rel="icon" href="data:,">
<meta http-equiv="content-type" content="text/html;charset=utf-8">
<style>
p {
Expand Down
Loading