Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c87ff47
Improve orchagent to use ZMQ feature flag
liuh-80 May 6, 2025
1d77cdc
Update main.cpp
liuh-80 May 6, 2025
b6da170
Update conftest.py
liuh-80 May 7, 2025
f4eb45a
Merge branch 'master' into dev/liuh/improve_zmq_parameter
liuh-80 May 7, 2025
5d4aebc
Update conftest.py
liuh-80 May 9, 2025
ddc0592
Update main.cpp
liuh-80 May 9, 2025
a5a78e5
Merge remote-tracking branch 'origin' into dev/liuh/improve_zmq_param…
liuh-80 May 10, 2025
50509b3
Improve code
liuh-80 May 10, 2025
c1b6cf6
Improve code
liuh-80 May 10, 2025
eae6a3d
Update main.cpp
liuh-80 May 15, 2025
6483a84
Update orch_zmq_config.h
liuh-80 May 26, 2025
2733d11
Update orch_zmq_config.cpp
liuh-80 May 26, 2025
a45c1be
Update orch_zmq_config.h
liuh-80 May 26, 2025
806e076
Remove unecessary code
liuh-80 May 26, 2025
b1d9ed2
Merge branch 'master' into dev/liuh/improve_zmq_parameter
prsunny May 28, 2025
7116ece
Merge branch 'master' into dev/liuh/improve_zmq_parameter
liuh-80 May 30, 2025
520e4a6
Merge branch 'master' into dev/liuh/improve_zmq_parameter
liuh-80 Jun 3, 2025
40222a2
Merge branch 'master' into dev/liuh/improve_zmq_parameter
liuh-80 Jun 9, 2025
e29c922
Merge branch 'master' into dev/liuh/improve_zmq_parameter
liuh-80 Jun 11, 2025
cf8640a
Improve code coverage
liuh-80 Jun 12, 2025
ab312bd
Merge branch 'master' into dev/liuh/improve_zmq_parameter
liuh-80 Jun 12, 2025
039a7f7
Fix code
liuh-80 Jun 12, 2025
7fa614a
Fix mock dbconnector
liuh-80 Jun 12, 2025
cea92ce
Fix PR comments
liuh-80 Jun 13, 2025
46a165f
Fix PR comments
liuh-80 Jun 16, 2025
965f236
Update orch_zmq_config.h
liuh-80 Jun 16, 2025
e9ef477
Update orchdaemon.cpp
liuh-80 Jun 16, 2025
001f9a6
Fix PR comments
liuh-80 Jun 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions lib/orch_zmq_config.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
#include <iostream>
#include <fstream>
#include <regex>

#include "dbconnector.h"
#include "logger.h"
#include "orch_zmq_config.h"
#include <stdio.h>

#define ZMQ_TABLE_CONFIGFILE "/etc/swss/orch_zmq_tables.conf"

// ZMQ none IPV6 address with port, for example: tcp://127.0.0.1:5555 tcp://localhost:5555
const std::regex ZMQ_NONE_IPV6_ADDRESS_WITH_PORT("\\w+:\\/\\/[^:]+:\\d+");

// ZMQ IPV6 address with port, for example: tcp://[fe80::fb7:c6df:9d3a:3d7b]:5555
const std::regex ZMQ_IPV6_ADDRESS_WITH_PORT("\\w+:\\/\\/\\[.*\\]+:\\d+");

std::set<std::string> swss::load_zmq_tables()
{
std::set<std::string> tables;
Expand All @@ -22,16 +32,62 @@ std::set<std::string> swss::load_zmq_tables()
return tables;
}


std::shared_ptr<swss::ZmqClient> swss::create_zmq_client(std::string zmq_address)
int swss::get_zmq_port()
{
// swssconfig running inside swss contianer, so need get ZMQ port according to namespace ID.
auto zmq_port = ORCH_ZMQ_PORT;
if (const char* nsid = std::getenv("NAMESPACE_ID"))
{
// namespace start from 0, using original ZMQ port for global namespace
zmq_port += atoi(nsid) + 1;
}

return std::make_shared<ZmqClient>(zmq_address + ":" + std::to_string(zmq_port));
return zmq_port;
}

std::shared_ptr<swss::ZmqClient> swss::create_zmq_client(std::string zmq_address, std::string vrf)
{
// swssconfig running inside swss contianer, so need get ZMQ port according to namespace ID.
auto zmq_port = get_zmq_port();
zmq_address = zmq_address + ":" + std::to_string(zmq_port);
SWSS_LOG_NOTICE("Create ZMQ server with address: %s, vrf: %s", zmq_address.c_str(), vrf.c_str());
return std::make_shared<ZmqClient>(zmq_address, vrf);
}

std::shared_ptr<swss::ZmqServer> swss::create_zmq_server(std::string zmq_address, std::string vrf)
{
// TODO: remove this check after orchagent.sh migrate to pass ZMQ address without port
if (!std::regex_search(zmq_address, ZMQ_NONE_IPV6_ADDRESS_WITH_PORT)
&& !std::regex_search(zmq_address, ZMQ_IPV6_ADDRESS_WITH_PORT))
{
auto zmq_port = get_zmq_port();
zmq_address = zmq_address + ":" + std::to_string(zmq_port);
}

SWSS_LOG_NOTICE("Create ZMQ server with address: %s, vrf: %s", zmq_address.c_str(), vrf.c_str());
return std::make_shared<ZmqServer>(zmq_address, vrf);
}

bool swss::get_feature_status(std::string feature, bool default_value)
{
std::shared_ptr<std::string> enabled = nullptr;

try
{
swss::DBConnector config_db("CONFIG_DB", 0);
enabled = config_db.hget("DEVICE_METADATA|localhost", feature);
}
catch (const std::runtime_error &e)
{
SWSS_LOG_ERROR("Not found feature %s failed with exception: %s", feature.c_str(), e.what());
return default_value;
}

if (!enabled)
{
SWSS_LOG_NOTICE("Not found feature %s status, return default value.", feature.c_str());
return default_value;
}

SWSS_LOG_NOTICE("Get feature %s status: %s", feature.c_str(), enabled->c_str());
return *enabled == "true";
}
16 changes: 15 additions & 1 deletion lib/orch_zmq_config.h
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
#ifndef SWSS_ORCH_ZMQ_CONFIG_H
#define SWSS_ORCH_ZMQ_CONFIG_H

#include <memory>
#include <string.h>
#include <set>

#include "dbconnector.h"
#include "zmqclient.h"
#include "zmqserver.h"
#include "zmqproducerstatetable.h"

/*
* swssconfig will only connect to local orchagent ZMQ endpoint.
*/
#define ZMQ_LOCAL_ADDRESS "tcp://localhost"

/*
* Feature flag to enable the gNMI service to send DASH events to orchagent via the ZMQ channel.
*/
#define ORCH_NORTHBOND_DASH_ZMQ_ENABLED "orch_northbond_dash_zmq_enabled"

namespace swss {

std::set<std::string> load_zmq_tables();

std::shared_ptr<ZmqClient> create_zmq_client(std::string zmq_address);
int get_zmq_port();

std::shared_ptr<ZmqClient> create_zmq_client(std::string zmq_address, std::string vrf="");

std::shared_ptr<ZmqServer> create_zmq_server(std::string zmq_address, std::string vrf="");

bool get_feature_status(std::string feature, bool default_value);

}

Expand Down
1 change: 1 addition & 0 deletions orchagent/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ orchagent_SOURCES = \
$(top_srcdir)/lib/gearboxutils.cpp \
$(top_srcdir)/lib/subintf.cpp \
$(top_srcdir)/lib/recorder.cpp \
$(top_srcdir)/lib/orch_zmq_config.cpp \
orchdaemon.cpp \
orch.cpp \
notifications.cpp \
Expand Down
13 changes: 6 additions & 7 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ extern "C" {
#include <logger.h>

#include "orchdaemon.h"
#include "orch_zmq_config.h"
#include "sai_serialize.h"
#include "saihelper.h"
#include "notifications.h"
Expand Down Expand Up @@ -361,9 +362,8 @@ int main(int argc, char **argv)
string record_location = Recorder::DEFAULT_DIR;
string swss_rec_filename = Recorder::SWSS_FNAME;
string sairedis_rec_filename = Recorder::SAIREDIS_FNAME;
string zmq_server_address = "tcp://127.0.0.1:" + to_string(ORCH_ZMQ_PORT);
string zmq_server_address = "";
string vrf;
bool enable_zmq = false;
string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME;
int record_type = 3; // Only swss and sairedis recordings enabled by default.
long heartBeatInterval = HEART_BEAT_INTERVAL_MSECS_DEFAULT;
Expand Down Expand Up @@ -456,7 +456,6 @@ int main(int argc, char **argv)
if (optarg)
{
zmq_server_address = optarg;
enable_zmq = true;
}
break;
case 't':
Expand Down Expand Up @@ -529,14 +528,14 @@ int main(int argc, char **argv)

// Instantiate ZMQ server
shared_ptr<ZmqServer> zmq_server = nullptr;
if (enable_zmq)
if (zmq_server_address.empty())
{
SWSS_LOG_NOTICE("Instantiate ZMQ server : %s, %s", zmq_server_address.c_str(), vrf.c_str());
zmq_server = make_shared<ZmqServer>(zmq_server_address.c_str(), vrf.c_str());
SWSS_LOG_NOTICE("The ZMQ channel on the northbound side of orchagent has been disabled.");
}
else
{
SWSS_LOG_NOTICE("ZMQ disabled");
SWSS_LOG_NOTICE("The ZMQ channel on the northbound side of orchagent has been initialized: %s, %s", zmq_server_address.c_str(), vrf.c_str());
zmq_server = create_zmq_server(zmq_server_address, vrf);
}

// Get switch_type
Expand Down
22 changes: 16 additions & 6 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sairedis.h>
#include "warm_restart.h"
#include <iostream>
#include "orch_zmq_config.h"

#define SAI_SWITCH_ATTR_CUSTOM_RANGE_BASE SAI_SWITCH_ATTR_CUSTOM_RANGE_START
#include "sairedis.h"
Expand Down Expand Up @@ -1222,11 +1223,20 @@ bool DpuOrchDaemon::init()
{
SWSS_LOG_NOTICE("DpuOrchDaemon init...");
OrchDaemon::init();

// Enable the gNMI service to send DASH events to orchagent via the ZMQ channel.
ZmqServer *dash_zmq_server = nullptr;
if (get_feature_status(ORCH_NORTHBOND_DASH_ZMQ_ENABLED, true))
{
SWSS_LOG_NOTICE("Enable the gNMI service to send DASH events to orchagent via the ZMQ channel.");
dash_zmq_server = m_zmqServer;
}

vector<string> dash_vnet_tables = {
APP_DASH_VNET_TABLE_NAME,
APP_DASH_VNET_MAPPING_TABLE_NAME
};
DashVnetOrch *dash_vnet_orch = new DashVnetOrch(m_applDb, dash_vnet_tables, m_dpu_appstateDb, m_zmqServer);
DashVnetOrch *dash_vnet_orch = new DashVnetOrch(m_applDb, dash_vnet_tables, m_dpu_appstateDb, dash_zmq_server);
gDirectory.set(dash_vnet_orch);

vector<string> dash_tables = {
Expand All @@ -1237,7 +1247,7 @@ bool DpuOrchDaemon::init()
APP_DASH_QOS_TABLE_NAME
};

DashOrch *dash_orch = new DashOrch(m_applDb, dash_tables, m_dpu_appstateDb, m_zmqServer);
DashOrch *dash_orch = new DashOrch(m_applDb, dash_tables, m_dpu_appstateDb, dash_zmq_server);
gDirectory.set(dash_orch);

vector<string> dash_ha_tables = {
Expand All @@ -1254,7 +1264,7 @@ bool DpuOrchDaemon::init()
APP_DASH_ROUTE_GROUP_TABLE_NAME
};

DashRouteOrch *dash_route_orch = new DashRouteOrch(m_applDb, dash_route_tables, dash_orch, m_dpu_appstateDb, m_zmqServer);
DashRouteOrch *dash_route_orch = new DashRouteOrch(m_applDb, dash_route_tables, dash_orch, m_dpu_appstateDb, dash_zmq_server);
gDirectory.set(dash_route_orch);

vector<string> dash_acl_tables = {
Expand All @@ -1264,20 +1274,20 @@ bool DpuOrchDaemon::init()
APP_DASH_ACL_GROUP_TABLE_NAME,
APP_DASH_ACL_RULE_TABLE_NAME
};
DashAclOrch *dash_acl_orch = new DashAclOrch(m_applDb, dash_acl_tables, dash_orch, m_dpu_appstateDb, m_zmqServer);
DashAclOrch *dash_acl_orch = new DashAclOrch(m_applDb, dash_acl_tables, dash_orch, m_dpu_appstateDb, dash_zmq_server);
gDirectory.set(dash_acl_orch);

vector<string> dash_tunnel_tables = {
APP_DASH_TUNNEL_TABLE_NAME
};
DashTunnelOrch *dash_tunnel_orch = new DashTunnelOrch(m_applDb, dash_tunnel_tables, m_dpu_appstateDb, m_zmqServer);
DashTunnelOrch *dash_tunnel_orch = new DashTunnelOrch(m_applDb, dash_tunnel_tables, m_dpu_appstateDb, dash_zmq_server);
gDirectory.set(dash_tunnel_orch);

vector<string> dash_meter_tables = {
APP_DASH_METER_POLICY_TABLE_NAME,
APP_DASH_METER_RULE_TABLE_NAME
};
DashMeterOrch *dash_meter_orch = new DashMeterOrch(m_applDb, dash_meter_tables, dash_orch, m_dpu_appstateDb, m_zmqServer);
DashMeterOrch *dash_meter_orch = new DashMeterOrch(m_applDb, dash_meter_tables, dash_orch, m_dpu_appstateDb, dash_zmq_server);
gDirectory.set(dash_meter_orch);

addOrchList(dash_acl_orch);
Expand Down
1 change: 1 addition & 0 deletions tests/mock_tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ tests_SOURCES = aclorch_ut.cpp \
$(top_srcdir)/lib/gearboxutils.cpp \
$(top_srcdir)/lib/subintf.cpp \
$(top_srcdir)/lib/recorder.cpp \
$(top_srcdir)/lib/orch_zmq_config.cpp \
$(top_srcdir)/orchagent/orchdaemon.cpp \
$(top_srcdir)/orchagent/orch.cpp \
$(top_srcdir)/orchagent/notifications.cpp \
Expand Down
52 changes: 52 additions & 0 deletions tests/mock_tests/mock_table.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "table.h"
#include "producerstatetable.h"
#include "producertable.h"
#include "mock_table.h"
#include <set>
#include <memory>

Expand Down Expand Up @@ -145,6 +146,12 @@ namespace swss
std::shared_ptr<std::string> DBConnector::hget(const std::string &key, const std::string &field)
{
std::string value;

if (field == HGET_THROW_EXCEPTION_FIELD_NAME)
{
throw std::runtime_error("HGET failed, unexpected reply type, memory exception");
}

if (_hget(getDbId(), key, "", field, value))
{
std::shared_ptr<std::string> ptr(new std::string(value));
Expand All @@ -156,6 +163,51 @@ namespace swss
}
}

int64_t DBConnector::hdel(const std::string &key, const std::string &field)
{
auto &table = gDB[getDbId()][key];
auto key_iter = table.find("");
if (key_iter == table.end())
{
return 0;
}

int removed = 0;
auto attrs = key_iter->second;
std::vector<FieldValueTuple> new_attrs;
for (auto attr_iter : attrs)
{
if (attr_iter.first == field)
{
removed += 1;
continue;
}

new_attrs.push_back(attr_iter);
}

table[""] = new_attrs;

return removed;
}

void DBConnector::hset(const std::string &key, const std::string &field, const std::string &value)
{
FieldValueTuple fvp(field, value);
std::vector<FieldValueTuple> attrs = { fvp };

auto &table = gDB[getDbId()][key];
auto iter = table.find("");
if (iter == table.end())
{
table[""] = attrs;
}
else
{
merge_values(iter->second, attrs);
}
}

void ProducerTable::set(const std::string &key,
const std::vector<FieldValueTuple> &values,
const std::string &op,
Expand Down
3 changes: 3 additions & 0 deletions tests/mock_tests/mock_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

#include "table.h"

// Use this field in the mock test to simulate an exception during hget.
#define HGET_THROW_EXCEPTION_FIELD_NAME "hget_throw_exception"

namespace testing_db
{
void reset();
Expand Down
Loading
Loading