diff --git a/cfgmgr/teammgr.cpp b/cfgmgr/teammgr.cpp index f6c6394cdb2..504413d9692 100644 --- a/cfgmgr/teammgr.cpp +++ b/cfgmgr/teammgr.cpp @@ -20,6 +20,10 @@ #include #include +#include +#include +#include +#include using namespace std; using namespace swss; @@ -32,6 +36,7 @@ TeamMgr::TeamMgr(DBConnector *confDb, DBConnector *applDb, DBConnector *statDb, m_cfgPortTable(confDb, CFG_PORT_TABLE_NAME), m_cfgLagTable(confDb, CFG_LAG_TABLE_NAME), m_cfgLagMemberTable(confDb, CFG_LAG_MEMBER_TABLE_NAME), + m_cfgModeTable(confDb, CFG_TEAMD_MODE_TABLE_NAME), m_appPortTable(applDb, APP_PORT_TABLE_NAME), m_appLagTable(applDb, APP_LAG_TABLE_NAME), m_statePortTable(statDb, STATE_PORT_TABLE_NAME), @@ -62,6 +67,45 @@ TeamMgr::TeamMgr(DBConnector *confDb, DBConnector *applDb, DBConnector *statDb, } m_mac = MacAddress(it->second); + + vector modeFvs; + std::string m_teamdMultiProcMode; + m_cfgModeTable.get("GLOBAL", modeFvs); + auto modeIt = find_if(modeFvs.begin(), modeFvs.end(), [](const FieldValueTuple &fv) { + return fv.first == "mode"; + }); + + if (modeIt != modeFvs.end()) + { + m_teamdMultiProcMode = modeIt->second; + } + + if (m_teamdMultiProcMode == "multi-process") + { + m_teamdUnifiedProcMode = false; + SWSS_LOG_INFO("start multi process with teamd..."); + } + else + { + m_teamdUnifiedProcMode = true; + const string dump_path = "/var/warmboot/teamd/"; + string res; + stringstream cmd; + cmd << TEAMD_CMD + << " -t " << "teamd-unified" + << " -L " << dump_path + << " -g -d"; + + if (exec(cmd.str(), res) != 0) + { + SWSS_LOG_INFO("Failed to start single process with teamd..."); + return; + } + ipcInitTeamd(); + + SWSS_LOG_INFO("start single process with teamd..."); + } + } bool TeamMgr::isPortStateOk(const string &alias) @@ -175,8 +219,51 @@ void TeamMgr::cleanTeamProcesses() std::unordered_map aliasPidMap; - for (const auto& alias: m_lagList) + if (m_teamdUnifiedProcMode == false) + { + for (const auto& alias: m_lagList) + { + pid_t pid; + // Sleep for 10 milliseconds so as to not overwhelm the netlink + // socket buffers with events about interfaces going down + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + try + { + ifstream pidFile("/var/run/teamd/" + alias + ".pid"); + if (pidFile.is_open()) + { + pidFile >> pid; + aliasPidMap[alias] = pid; + SWSS_LOG_INFO("Read port channel %s pid %d", alias.c_str(), pid); + } + else + { + SWSS_LOG_NOTICE("Unable to read pid file for %s, skipping...", alias.c_str()); + continue; + } + } + catch (const std::exception &e) + { + // Handle Warm/Fast reboot scenario + SWSS_LOG_NOTICE("Skipping non-existent port channel %s pid...", alias.c_str()); + continue; + } + + if (kill(pid, SIGTERM)) + { + SWSS_LOG_ERROR("Failed to send SIGTERM to port channel %s pid %d: %s", alias.c_str(), pid, strerror(errno)); + aliasPidMap.erase(alias); + } + else + { + SWSS_LOG_NOTICE("Sent SIGTERM to port channel %s pid %d", alias.c_str(), pid); + } + } + } + else { + std::string alias = "teamd-unified"; pid_t pid; // Sleep for 10 milliseconds so as to not overwhelm the netlink // socket buffers with events about interfaces going down @@ -194,14 +281,14 @@ void TeamMgr::cleanTeamProcesses() else { SWSS_LOG_NOTICE("Unable to read pid file for %s, skipping...", alias.c_str()); - continue; + //continue; } } catch (const std::exception &e) { // Handle Warm/Fast reboot scenario SWSS_LOG_NOTICE("Skipping non-existent port channel %s pid...", alias.c_str()); - continue; + //continue; } if (kill(pid, SIGTERM)) @@ -214,7 +301,6 @@ void TeamMgr::cleanTeamProcesses() SWSS_LOG_NOTICE("Sent SIGTERM to port channel %s pid %d", alias.c_str(), pid); } } - for (const auto& cit: aliasPidMap) { const auto &alias = cit.first; @@ -561,13 +647,102 @@ bool TeamMgr::setLagLearnMode(const string &alias, const string &learn_mode) return true; } +void TeamMgr::ipcInitTeamd() +{ + struct sockaddr_un addr; + + // create socket + sockfd = socket(AF_UNIX, SOCK_SEQPACKET, 0); + if (sockfd < 0) { + SWSS_LOG_ERROR("socket error: %s", strerror(errno)); + return; + } + + // setup socket address structure + bzero(&addr, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, TEAMD_MULTI_SOCK_PATH, sizeof(addr.sun_path) - 1); + + // connect to the server socket created by teamd + if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un)) < 0) { + SWSS_LOG_ERROR("ipc connect error: %s", strerror(errno)); + close(sockfd); + sockfd = -1; + return; + } +} + +void TeamMgr::ipcCleanup() +{ + if (sockfd >= 0) + { + close(sockfd); + sockfd = -1; + SWSS_LOG_NOTICE("Closed IPC socket to teamd"); + } +} + +int TeamMgr::sendIpcToTeamd(const std::string& command, const std::vector& args) +{ + if (sockfd < 0) { + SWSS_LOG_ERROR("IPC socket is not initialized"); + return -1; + } + + // Prepare the message to send + std::ostringstream message; + message << TEAMD_IPC_REQ << "\n" << command << "\n"; + + for (size_t i = 0; i < args.size(); ++i) + { + std::string processed_arg = args[i]; + + message << processed_arg << "\n"; + } + + std::string final_msg = message.str(); + + ssize_t sent = send(sockfd, final_msg.c_str(), final_msg.length(), 0); + if (sent < 0) + { + SWSS_LOG_ERROR("Failed to send message to teamd: %s", strerror(errno)); + return -1; + } + + SWSS_LOG_DEBUG("Sent %s command to teamd (bytes sent: %zd) :\n%s", command.c_str(), sent , final_msg.c_str()); + + // Optional: read response from teamd + char buffer[512]; + ssize_t received = recv(sockfd, buffer, sizeof(buffer) - 1, 0); + if (received > 0) + { + buffer[received] = '\0'; + if (strncmp(buffer, "REPLY_ERROR", strlen("REPLY_ERROR")) == 0) + { + SWSS_LOG_WARN("teamd replied with REPLY_ERROR for %s", command.c_str()); + return task_need_retry; + } + + SWSS_LOG_DEBUG("Response from teamd to teammgrd: %s", buffer); + return task_success; + + } + else if (received < 0) + { + SWSS_LOG_WARN("No response from teamd or recv failed: %s", strerror(errno)); + return task_need_retry; + } + + SWSS_LOG_INFO("Response from teamd to teammgrd: %s", buffer); + + return task_success; +} + task_process_status TeamMgr::addLag(const string &alias, int min_links, bool fallback, bool fast_rate) { SWSS_LOG_ENTER(); - stringstream cmd; string res; - stringstream conf; const string dump_path = "/var/warmboot/teamd/"; @@ -601,7 +776,6 @@ task_process_status TeamMgr::addLag(const string &alias, int min_links, bool fal break; } } - conf << "'{\"device\":\"" << alias << "\"," << "\"hwaddr\":\"" << mac_boot.to_string() << "\"," << "\"runner\":{" @@ -630,18 +804,32 @@ task_process_status TeamMgr::addLag(const string &alias, int min_links, bool fal string warmstart_flag = WarmStart::isWarmStart() ? " -w -o" : " -r"; - cmd << TEAMD_CMD - << warmstart_flag - << " -t " << alias - << " -c " << conf.str() - << " -L " << dump_path - << " -g -d"; + if (m_teamdUnifiedProcMode) + { + std::string jsonConf = conf.str(); - if (exec(cmd.str(), res) != 0) + // Remove surrounding single quotes if present + if (!jsonConf.empty() && jsonConf.front() == '\'' && jsonConf.back() == '\'') + { + jsonConf = jsonConf.substr(1, jsonConf.size() - 2); + } + sendIpcToTeamd("PortChannelAdd", {alias, jsonConf}); + } + else { - SWSS_LOG_INFO("Failed to start port channel %s with teamd, retry...", - alias.c_str()); - return task_need_retry; + cmd << TEAMD_CMD + << warmstart_flag + << " -t " << alias + << " -c " << conf.str() + << " -L " << dump_path + << " -g -d"; + + if (exec(cmd.str(), res) != 0) + { + SWSS_LOG_INFO("Failed to start port channel %s with teamd, retry...", + alias.c_str()); + return task_need_retry; + } } SWSS_LOG_NOTICE("Start port channel %s with teamd", alias.c_str()); @@ -653,30 +841,37 @@ bool TeamMgr::removeLag(const string &alias) { SWSS_LOG_ENTER(); - pid_t pid; - + if (m_teamdUnifiedProcMode) { - ifstream pidfile("/var/run/teamd/" + alias + ".pid"); - if (pidfile.is_open()) + sendIpcToTeamd("PortChannelRemove", { alias }); + } + else + { + pid_t pid; + { - pidfile >> pid; - SWSS_LOG_INFO("Read port channel %s pid %d", alias.c_str(), pid); + ifstream pidfile("/var/run/teamd/" + alias + ".pid"); + if (pidfile.is_open()) + { + pidfile >> pid; + SWSS_LOG_INFO("Read port channel %s pid %d", alias.c_str(), pid); + } + else + { + SWSS_LOG_NOTICE("Failed to remove non-existent port channel %s pid...", alias.c_str()); + return false; + } } - else + + if (kill(pid, SIGTERM)) { - SWSS_LOG_NOTICE("Failed to remove non-existent port channel %s pid...", alias.c_str()); + SWSS_LOG_ERROR("Failed to send SIGTERM to port channel %s pid %d: %s", alias.c_str(), pid, strerror(errno)); return false; } - } - if (kill(pid, SIGTERM)) - { - SWSS_LOG_ERROR("Failed to send SIGTERM to port channel %s pid %d: %s", alias.c_str(), pid, strerror(errno)); - return false; + SWSS_LOG_NOTICE("Stop port channel %s", alias.c_str()); } - SWSS_LOG_NOTICE("Stop port channel %s", alias.c_str()); - return true; } @@ -755,36 +950,72 @@ task_process_status TeamMgr::addLagMember(const string &lag, const string &membe cmd.str(""); cmd.clear(); - // Set admin down LAG member (required by teamd) and enslave it - // ip link set dev down; - // teamdctl port config update { "lacp_key": , "link_watch": { "name": "ethtool" } }; - // teamdctl port add ; - cmd << IP_CMD << " link set dev " << shellquote(member) << " down; "; - cmd << TEAMDCTL_CMD << " " << shellquote(lag) << " port config update " << shellquote(member) - << " '{\"lacp_key\":" - << keyId - << ",\"link_watch\": {\"name\": \"ethtool\"} }'; "; - cmd << TEAMDCTL_CMD << " " << shellquote(lag) << " port add " << shellquote(member); - - if (exec(cmd.str(), res) != 0) + if (m_teamdUnifiedProcMode) { - // teamdctl port add command will fail when the member port is not - // set to admin status down; it is possible that some other processes - // or users (e.g. portmgrd) are executing the command to bring up the - // member port while adding this port into the port channel. This piece - // of code will check if the port is set to admin status up. If yes, - // it will retry to add the port into the port channel. - if (checkPortIffUp(member)) + cmd << IP_CMD << " link set dev " << shellquote(member) << " down"; + if (exec(cmd.str(), res) != 0) { - SWSS_LOG_INFO("Failed to add %s to port channel %s, retry...", - member.c_str(), lag.c_str()); + SWSS_LOG_WARN("Failed to bring down port %s before IPC LAG add", member.c_str()); + return task_failed; + } + std::string portConfig = "{\"lacp_key\":" + std::to_string(keyId) + + ",\"link_watch\": {\"name\": \"ethtool\"} }"; + + if (sendIpcToTeamd("PortConfigUpdate", { lag, member, portConfig }) != 0) + { + SWSS_LOG_ERROR("IPC: Failed to send PortConfigUpdate for %s in %s", member.c_str(), lag.c_str()); return task_need_retry; } - else + + SWSS_LOG_INFO("IPC: Sent PortConfigUpdate for %s to port channel %s", member.c_str(), lag.c_str()); + + // Step 2: Send PortAdd via IPC. + if (sendIpcToTeamd("PortAdd", { lag, member }) != 0) { - SWSS_LOG_ERROR("Failed to add %s to port channel %s", - member.c_str(), lag.c_str()); - return task_failed; + if (checkPortIffUp(member)) + { + SWSS_LOG_INFO("IPC: Failed to add %s to port channel %s, %s .", member.c_str(), lag.c_str(), checkPortIffUp(member)?"retry..":""); + return task_need_retry; + } + else + { + SWSS_LOG_ERROR("IPC: Failed to add %s to port channel %s", member.c_str(), lag.c_str()); + return task_failed; + } + } + SWSS_LOG_NOTICE("IPC: Successfully added %s to port channel %s", member.c_str(), lag.c_str()); + } + else + { + // Set admin down LAG member (required by teamd) and enslave it + // ip link set dev down; + // teamdctl port config update { "lacp_key": , "link_watch": { "name": "ethtool" } }; + // teamdctl port add ; + cmd << IP_CMD << " link set dev " << shellquote(member) << " down; "; + cmd << TEAMDCTL_CMD << " " << shellquote(lag) << " port config update " << shellquote(member) + << " '{\"lacp_key\":" << keyId << ",\"link_watch\": {\"name\": \"ethtool\"} }'; "; + SWSS_LOG_NOTICE("Add port update first %s to port channel %s", member.c_str(), lag.c_str()); + cmd << TEAMDCTL_CMD << " " << shellquote(lag) << " port add " << shellquote(member); + if (exec(cmd.str(), res) != 0) + { + // teamdctl port add command will fail when the member port is not + // set to admin status down; it is possible that some other processes + // or users (e.g. portmgrd) are executing the command to bring up the + // member port while adding this port into the port channel. This piece + // of code will check if the port is set to admin status up. If yes, + // it will retry to add the port into the port channel. + if (checkPortIffUp(member)) + { + SWSS_LOG_INFO("Failed to add %s to port channel %s, retry...", + member.c_str(), lag.c_str()); + return task_need_retry; + } + else + { + SWSS_LOG_ERROR("Failed to add %s to port channel %s", + member.c_str(), lag.c_str()); + return task_failed; + } } } @@ -838,9 +1069,15 @@ bool TeamMgr::removeLagMember(const string &lag, const string &member) stringstream cmd; string res; - - // teamdctl port remove ; - cmd << TEAMDCTL_CMD << " " << lag << " port remove " << member << "; "; + if (m_teamdUnifiedProcMode) + { + sendIpcToTeamd("PortRemove", { lag, member }); + } + else + { + // teamdctl port remove ; + cmd << TEAMDCTL_CMD << " " << lag << " port remove " << member << "; "; + } vector fvs; m_cfgPortTable.get(member, fvs); diff --git a/cfgmgr/teammgr.h b/cfgmgr/teammgr.h index 3c98f87dc5e..79d49014437 100644 --- a/cfgmgr/teammgr.h +++ b/cfgmgr/teammgr.h @@ -18,13 +18,18 @@ class TeamMgr : public Orch const std::vector &tables); using Orch::doTask; + void ipcInitTeamd(); + int sendIpcToTeamd(const std::string& command, + const std::vector& args); void cleanTeamProcesses(); + void ipcCleanup(); private: Table m_cfgMetadataTable; // To retrieve MAC address Table m_cfgPortTable; Table m_cfgLagTable; Table m_cfgLagMemberTable; + Table m_cfgModeTable; Table m_statePortTable; Table m_stateLagTable; Table m_stateMACsecIngressSATable; @@ -33,6 +38,8 @@ class TeamMgr : public Orch ProducerStateTable m_appLagTable; std::set m_lagList; + bool m_teamdUnifiedProcMode = false; + int sockfd; MacAddress m_mac; @@ -61,4 +68,7 @@ class TeamMgr : public Orch uint16_t generateLacpKey(const std::string&); }; +#define TEAMD_MULTI_SOCK_PATH "/var/run/teamd/teamd-unified.sock" +#define TEAMD_IPC_REQ "REQUEST" + } diff --git a/cfgmgr/teammgrd.cpp b/cfgmgr/teammgrd.cpp index a18838c959e..06d3fe81f99 100644 --- a/cfgmgr/teammgrd.cpp +++ b/cfgmgr/teammgrd.cpp @@ -54,11 +54,13 @@ int main(int argc, char **argv) TableConnector conf_lag_table(&conf_db, CFG_LAG_TABLE_NAME); TableConnector conf_lag_member_table(&conf_db, CFG_LAG_MEMBER_TABLE_NAME); + TableConnector conf_teamd_mode_table(&conf_db, CFG_TEAMD_MODE_TABLE_NAME); TableConnector state_port_table(&state_db, STATE_PORT_TABLE_NAME); vector tables = { conf_lag_table, conf_lag_member_table, + conf_teamd_mode_table, state_port_table }; @@ -93,6 +95,7 @@ int main(int argc, char **argv) c->execute(); } teammgr.cleanTeamProcesses(); + teammgr.ipcCleanup(); SWSS_LOG_NOTICE("Exiting"); } catch (const exception &e) diff --git a/teamsyncd/teamsync.cpp b/teamsyncd/teamsync.cpp index e8dcb1fc55a..a4e223174ee 100644 --- a/teamsyncd/teamsync.cpp +++ b/teamsyncd/teamsync.cpp @@ -254,7 +254,13 @@ TeamSync::TeamPortSync::TeamPortSync(const string &lagName, int ifindex, { try { - m_team = team_alloc(); + m_nl = team_netlink_alloc(); + if (!m_nl) + { + throw system_error(make_error_code(errc::address_not_available), + "Unable to allocate team netlink socket"); + } + m_team = team_alloc(m_nl); if (!m_team) { throw system_error(make_error_code(errc::address_not_available), @@ -265,7 +271,9 @@ TeamSync::TeamPortSync::TeamPortSync(const string &lagName, int ifindex, if (err) { team_free(m_team); + team_netlink_free(m_nl); m_team = NULL; + m_nl = NULL; throw system_error(make_error_code(errc::address_not_available), "Unable to initialize team socket"); } @@ -274,6 +282,8 @@ TeamSync::TeamPortSync::TeamPortSync(const string &lagName, int ifindex, if (err) { team_free(m_team); + team_netlink_free(m_nl); + m_nl = NULL; m_team = NULL; throw system_error(make_error_code(errc::address_not_available), "Unable to register port change event"); @@ -307,6 +317,7 @@ TeamSync::TeamPortSync::~TeamPortSync() { team_change_handler_unregister(m_team, &gPortChangeHandler, this); team_free(m_team); + team_netlink_free(m_nl); } } @@ -382,11 +393,11 @@ int TeamSync::TeamPortSync::teamdHandler(struct team_handle *team, void *arg, int TeamSync::TeamPortSync::getFd() { - return team_get_event_fd(m_team); + return team_get_event_fd(m_team, NULL); } uint64_t TeamSync::TeamPortSync::readData() { - team_handle_events(m_team); + team_handle_events(m_team, NULL); return 0; } diff --git a/teamsyncd/teamsync.h b/teamsyncd/teamsync.h index 536a4e96511..a75cd6286c1 100644 --- a/teamsyncd/teamsync.h +++ b/teamsyncd/teamsync.h @@ -53,6 +53,7 @@ class TeamSync : public NetMsg static const struct team_change_handler gPortChangeHandler; private: ProducerStateTable *m_lagMemberTable; + struct team_netlink *m_nl; struct team_handle *m_team; std::string m_lagName; int m_ifindex; diff --git a/tests/mock_tests/teammgrd/teamd_ipc.h b/tests/mock_tests/teammgrd/teamd_ipc.h new file mode 100644 index 00000000000..e3848239146 --- /dev/null +++ b/tests/mock_tests/teammgrd/teamd_ipc.h @@ -0,0 +1,7 @@ +#pragma once +#include +#include +#include "orch.h" + +// Weak declaration +__attribute__((weak)) int send_ipc_to_teamd(const std::string &method, const std::vector &args); diff --git a/tests/mock_tests/teammgrd/teammgr_ut.cpp b/tests/mock_tests/teammgrd/teammgr_ut.cpp index a40f39f4841..8ebfd156533 100644 --- a/tests/mock_tests/teammgrd/teammgr_ut.cpp +++ b/tests/mock_tests/teammgrd/teammgr_ut.cpp @@ -1,7 +1,17 @@ #include "gtest/gtest.h" +#include #include "../mock_table.h" #include "teammgr.h" #include +#include "teamd_ipc.h" + +using ::testing::_; +using ::testing::Return; + +using namespace testing; // <-- Needed to use EXPECT_CALL, DoAll, Return, etc. +using namespace boost::mpl::placeholders; // Optional, if using Boost placeholders + +std::function &)> g_mock_send_ipc_to_teamd; extern int (*callback)(const std::string &cmd, std::string &stdout); extern std::vector mockCallArgs; @@ -11,6 +21,11 @@ static std::map pidFiles; static int (*callback_kill)(pid_t pid, int sig) = NULL; static std::pair (*callback_fopen)(const char *pathname, const char *mode) = NULL; +int send_ipc_to_teamd(const std::string &method, const std::vector &args) +{ + return g_mock_send_ipc_to_teamd(method, args); +} + static int cb_kill(pid_t pid, int sig) { mockKillCommands.push_back(std::make_pair(pid, sig)); @@ -146,6 +161,8 @@ namespace teammgr_ut std::vector vec; vec.emplace_back("mac", "01:23:45:67:89:ab"); metadata_table.set("localhost", vec); + swss::Table cfg_mode_table = swss::Table(m_config_db.get(), CFG_TEAMD_MODE_TABLE_NAME); + cfg_mode_table.set("GLOBAL",{ {"mode","multi-process"} }); TableConnector conf_lag_table(m_config_db.get(), CFG_LAG_TABLE_NAME); TableConnector conf_lag_member_table(m_config_db.get(), CFG_LAG_MEMBER_TABLE_NAME); @@ -185,6 +202,7 @@ namespace teammgr_ut teammgr.addExistingData(&cfg_lag_table); teammgr.doTask(); ASSERT_NE(mockCallArgs.size(), 0); + ASSERT_FALSE(mockCallArgs.empty()); EXPECT_NE(mockCallArgs.front().find("/usr/bin/teamd -r -t PortChannel382"), std::string::npos); EXPECT_EQ(mockCallArgs.size(), 1); EXPECT_EQ(mockKillCommands.size(), 1); @@ -264,4 +282,29 @@ namespace teammgr_ut EXPECT_EQ(mockKillCommands.size(), 0); EXPECT_GE(std::chrono::duration_cast(end - begin).count(), 200); } + + TEST_F(TeamMgrTest, testIpcSendRetry) + { + swss::Table cfg_mode_table(m_config_db.get(), CFG_TEAMD_MODE_TABLE_NAME); + swss::Table cfg_lag_table(m_config_db.get(), CFG_LAG_TABLE_NAME); + cfg_mode_table.del("GLOBAL"); + cfg_lag_table.set("PortChannel123", { + {"admin_status", "up"}, + {"mtu", "9100"}, + {"lacp_key", "auto"}, + {"min_links", "1"} + }); + + g_mock_send_ipc_to_teamd = [](const std::string &method, const std::vector &args) -> int { + if (method == "PortChannelAdd" && !args.empty() && args[0]== "PortChannel123") + { + return task_need_retry; + } + return task_success; + }; + + swss::TeamMgr teammgr(m_config_db.get(), m_app_db.get(), m_state_db.get(), cfg_lag_tables); + teammgr.addExistingData(&cfg_lag_table); + teammgr.doTask(); + } } diff --git a/tlm_teamd/main.cpp b/tlm_teamd/main.cpp index 291b044eda1..edafe9c4f8c 100644 --- a/tlm_teamd/main.cpp +++ b/tlm_teamd/main.cpp @@ -89,7 +89,6 @@ int main() swss::Logger::linkToDbNative("tlm_teamd"); SWSS_LOG_NOTICE("Starting"); swss::DBConnector db("STATE_DB", 0); - ValuesStore values_store(&db); TeamdCtlMgr teamdctl_mgr; @@ -97,6 +96,32 @@ int main() swss::Selectable * event; swss::SubscriberStateTable sst_lag(&db, STATE_LAG_TABLE_NAME); s.addSelectable(&sst_lag); + swss::DBConnector config_db("CONFIG_DB", 0); + + swss::Table table(&config_db, "TEAMD"); + std::vector values; + + std::string m_teamdMultiProcMode = "unified"; + bool key_exists = table.get("GLOBAL", values); + + if (key_exists && !values.empty()) + { + for (const auto& fv : values) + { + if (fv.first == "mode" && fv.second == "multi-process") + + { + m_teamdMultiProcMode = fv.second; + break; + } + } + } + if (m_teamdMultiProcMode == "multi-process") { + teamdctl_mgr.m_teamdUnifiedProcMode = false; + } else { + teamdctl_mgr.m_teamdUnifiedProcMode = true; + } + while (g_run && rc == 0) { diff --git a/tlm_teamd/teamdctl_mgr.cpp b/tlm_teamd/teamdctl_mgr.cpp index 6a7a36f7400..f3eeb3b3892 100644 --- a/tlm_teamd/teamdctl_mgr.cpp +++ b/tlm_teamd/teamdctl_mgr.cpp @@ -4,9 +4,14 @@ #include #include "teamdctl_mgr.h" +#include +#include +#include +#include +#include +#include #define MAX_RETRY 3 - /// /// Custom function for libteamdctl logger. IT is empty to prevent libteamdctl to spam us with the error messages /// @param tdc teamdctl descriptor @@ -30,13 +35,15 @@ void teamdctl_log_function(struct teamdctl *tdc, int priority, /// TeamdCtlMgr::~TeamdCtlMgr() { - for (const auto & p: m_handlers) - { - const auto & lag_name = p.first; - const auto & tdc = m_handlers[lag_name]; - teamdctl_disconnect(tdc); - teamdctl_free(tdc); - SWSS_LOG_NOTICE("Exiting. Disconnecting from teamd. LAG '%s'", lag_name.c_str()); + if (!m_teamdUnifiedProcMode) { + for (const auto & p: m_handlers) + { + const auto & lag_name = p.first; + const auto & tdc = m_handlers[lag_name]; + teamdctl_disconnect(tdc); + teamdctl_free(tdc); + SWSS_LOG_NOTICE("Exiting. Disconnecting from teamd. LAG '%s'", lag_name.c_str()); + } } } @@ -77,40 +84,45 @@ bool TeamdCtlMgr::add_lag(const std::string & lag_name) /// bool TeamdCtlMgr::try_add_lag(const std::string & lag_name) { - if (m_lags_to_add.find(lag_name) == m_lags_to_add.end()) - { - m_lags_to_add[lag_name] = 0; - } - int attempt = m_lags_to_add[lag_name]; - - auto tdc = teamdctl_alloc(); - if (!tdc) + if (m_teamdUnifiedProcMode) { - SWSS_LOG_ERROR("Can't allocate memory for teamdctl handler. LAG='%s'. attempt=%d", lag_name.c_str(), attempt); - m_lags_to_add[lag_name]++; - return false; + SWSS_LOG_NOTICE("In default. LAG='%s' will be handled via IPC.", lag_name.c_str()); + m_handlers.emplace(lag_name, nullptr); + return true; } - - teamdctl_set_log_fn(tdc, &teamdctl_log_function); - - int err = teamdctl_connect(tdc, lag_name.c_str(), nullptr, "usock"); - if (err) + else { - if (attempt != 0) + if (m_lags_to_add.find(lag_name) == m_lags_to_add.end()) { - SWSS_LOG_WARN("Can't connect to teamd LAG='%s', error='%s'. attempt=%d", lag_name.c_str(), strerror(-err), attempt); + m_lags_to_add[lag_name] = 0; } - teamdctl_free(tdc); - m_lags_to_add[lag_name]++; - return false; - } - - m_handlers.emplace(lag_name, tdc); - m_lags_to_add.erase(lag_name); - SWSS_LOG_NOTICE("The LAG '%s' has been added.", lag_name.c_str()); - return true; + int attempt = m_lags_to_add[lag_name]; + auto tdc = teamdctl_alloc(); + if (!tdc) + { + SWSS_LOG_ERROR("Can't allocate memory for teamdctl handler. LAG='%s'. attempt=%d", lag_name.c_str(), attempt); + m_lags_to_add[lag_name]++; + return false; + } + teamdctl_set_log_fn(tdc, &teamdctl_log_function); + int err = teamdctl_connect(tdc, lag_name.c_str(), nullptr, "usock"); + if (err) + { + if (attempt != 0) + { + SWSS_LOG_WARN("Can't connect to teamd LAG='%s', error='%s'. attempt=%d", lag_name.c_str(), strerror(-err), attempt); + } + teamdctl_free(tdc); + m_lags_to_add[lag_name]++; + return false; + } + m_handlers.emplace(lag_name, tdc); + m_lags_to_add.erase(lag_name); + SWSS_LOG_NOTICE("The LAG '%s' has been added.", lag_name.c_str()); + return true; + } } /// @@ -121,13 +133,21 @@ bool TeamdCtlMgr::try_add_lag(const std::string & lag_name) /// bool TeamdCtlMgr::remove_lag(const std::string & lag_name) { + if (has_key(lag_name)) { - auto tdc = m_handlers[lag_name]; - teamdctl_disconnect(tdc); - teamdctl_free(tdc); - m_handlers.erase(lag_name); - SWSS_LOG_NOTICE("The LAG '%s' has been removed.", lag_name.c_str()); + if (m_teamdUnifiedProcMode) + { + m_handlers.erase(lag_name); + } + else + { + auto tdc = m_handlers[lag_name]; + teamdctl_disconnect(tdc); + teamdctl_free(tdc); + m_handlers.erase(lag_name); + } + SWSS_LOG_NOTICE("The LAG '%s' has been removed from db.", lag_name.c_str()); } else if (m_lags_to_add.find(lag_name) != m_lags_to_add.end()) { @@ -184,47 +204,94 @@ TeamdCtlDump TeamdCtlMgr::get_dump(const std::string & lag_name, bool to_retry) TeamdCtlDump res = { false, "" }; if (has_key(lag_name)) { - auto tdc = m_handlers[lag_name]; - char * dump; - int r = teamdctl_state_get_raw_direct(tdc, &dump); - if (r == 0) + if (m_teamdUnifiedProcMode) { - res = { true, std::string(dump) }; - - // If this lag interface errored last time, remove the entry - if (m_lags_err_retry.find(lag_name) != m_lags_err_retry.end()) + std::string ipc_response; + int ret = sendIpcToTeamd("StateDump", {lag_name}, ipc_response); // Send the IPC request + if (ret == 0) { - SWSS_LOG_NOTICE("The LAG '%s' had errored in get_dump earlier, removing it", lag_name.c_str()); - m_lags_err_retry.erase(lag_name); + auto pos = ipc_response.find('{'); + if (pos != std::string::npos) + { + ipc_response.erase(0, pos); + } + res = {true, ipc_response}; // Get the response from teamd via IPC + m_lags_err_retry.erase(lag_name); // Clear retry count on success + } + else + { + // In case of failure and retry flag is set, check if it fails for MAX_RETRY times. + if (to_retry) + { + if (m_lags_err_retry.find(lag_name) != m_lags_err_retry.end()) + { + if (m_lags_err_retry[lag_name] == MAX_RETRY) + { + SWSS_LOG_ERROR("Can't get dump for LAG '%s'. Skipping", lag_name.c_str()); + m_lags_err_retry.erase(lag_name); + } + else + { + m_lags_err_retry[lag_name]++; + } + } + else + { + // This time a different lag interface errored out. + m_lags_err_retry[lag_name] = 1; + } + } + else + { + // No need to retry if the flag is not set. + SWSS_LOG_ERROR("Can't get dump for LAG '%s'. Skipping", lag_name.c_str()); + } } } else { - // In case of failure and retry flag is set, check if it fails for MAX_RETRY times. - if (to_retry) + auto tdc = m_handlers[lag_name]; + char * dump; + int r = teamdctl_state_get_raw_direct(tdc, &dump); + if (r == 0) { + res = { true, std::string(dump) }; + // If this lag interface errored last time, remove the entry if (m_lags_err_retry.find(lag_name) != m_lags_err_retry.end()) { - if (m_lags_err_retry[lag_name] == MAX_RETRY) + SWSS_LOG_NOTICE("The LAG '%s' had errored in get_dump earlier, removing it", lag_name.c_str()); + m_lags_err_retry.erase(lag_name); + } + } + else + { + // In case of failure and retry flag is set, check if it fails for MAX_RETRY times. + if (to_retry) + { + if (m_lags_err_retry.find(lag_name) != m_lags_err_retry.end()) { - SWSS_LOG_ERROR("Can't get dump for LAG '%s'. Skipping", lag_name.c_str()); - m_lags_err_retry.erase(lag_name); + if (m_lags_err_retry[lag_name] == MAX_RETRY) + { + SWSS_LOG_ERROR("Can't get dump for LAG '%s'. Skipping", lag_name.c_str()); + m_lags_err_retry.erase(lag_name); + } + else + { + m_lags_err_retry[lag_name]++; + } } else - m_lags_err_retry[lag_name]++; + { + // This time a different lag interface errored out. + m_lags_err_retry[lag_name] = 1; + } } else { - - // This time a different lag interface errored out. - m_lags_err_retry[lag_name] = 1; + // No need to retry if the flag is not set. + SWSS_LOG_ERROR("Can't get dump for LAG '%s'. Skipping", lag_name.c_str()); } } - else - { - // No need to retry if the flag is not set. - SWSS_LOG_ERROR("Can't get dump for LAG '%s'. Skipping", lag_name.c_str()); - } } } else @@ -258,3 +325,63 @@ TeamdCtlDumps TeamdCtlMgr::get_dumps(bool to_retry) return res; } + +int TeamdCtlMgr::sendIpcToTeamd(const std::string& command, + const std::vector& args, + std::string& response_out) +{ + int sockfd = socket(AF_UNIX, SOCK_SEQPACKET, 0); + if (sockfd < 0) + { + SWSS_LOG_ERROR("Failed to create socket: %s", strerror(errno)); + return -1; + } + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, TEAMD_MULTI_SOCK_PATH, sizeof(addr.sun_path) - 1); + + if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) + { + close(sockfd); + return -1; + } + + std::ostringstream message; + message << TEAMD_IPC_REQ << "\n" << command << "\n"; + + for (size_t i = 0; i < args.size(); ++i) + { + message << args[i] << "\n"; + } + + std::string final_msg = message.str(); + SWSS_LOG_NOTICE("Sending IPC message to teamd:\n%s", final_msg.c_str()); + + ssize_t sent = send(sockfd, final_msg.c_str(), final_msg.length(), 0); + if (sent < 0) + { + SWSS_LOG_ERROR("Failed to send message to teamd: %s", strerror(errno)); + close(sockfd); + return -1; + } + + char buffer[65536]; + ssize_t received = recv(sockfd, buffer, sizeof(buffer) - 1, 0); + if (received > 0) + { + buffer[received] = '\0'; + response_out = std::string(buffer); + SWSS_LOG_NOTICE("Response from teamd to teammgrd: %s", buffer); + close(sockfd); + return 0; + } + else + { + SWSS_LOG_WARN("No response from teamd or recv failed: %s", strerror(errno)); + close(sockfd); + return -1; + } +} + diff --git a/tlm_teamd/teamdctl_mgr.h b/tlm_teamd/teamdctl_mgr.h index f6a5dd204f5..027dfc2fead 100644 --- a/tlm_teamd/teamdctl_mgr.h +++ b/tlm_teamd/teamdctl_mgr.h @@ -21,6 +21,11 @@ class TeamdCtlMgr // Retry logic added to prevent incorrect error reporting in dump API's TeamdCtlDump get_dump(const std::string & lag_name, bool to_retry); TeamdCtlDumps get_dumps(bool to_retry); + int sendIpcToTeamd(const std::string& command, + const std::vector& args, + std::string& response_out); + bool m_teamdUnifiedProcMode = false; + private: bool has_key(const std::string & lag_name) const; @@ -29,6 +34,11 @@ class TeamdCtlMgr std::unordered_map m_handlers; std::unordered_map m_lags_to_add; std::unordered_map m_lags_err_retry; + int sockfd; const int max_attempts_to_add = 10; }; + + +#define TEAMD_MULTI_SOCK_PATH "/var/run/teamd/teamd-unified.sock" +#define TEAMD_IPC_REQ "REQUEST"