From 766ddba3cec01c4297738f87658c2b8821af6255 Mon Sep 17 00:00:00 2001 From: Vivek Reddy Date: Wed, 14 Jan 2026 07:59:46 +0000 Subject: [PATCH 1/3] [DPU] [HA] Add support for Flow API Signed-off-by: Vivek Reddy --- orchagent/Makefile.am | 1 + orchagent/dash/dashhafloworch.cpp | 1047 ++++++++++++++++++++++++ orchagent/dash/dashhafloworch.h | 178 ++++ orchagent/notifications.cpp | 14 + orchagent/notifications.h | 1 + orchagent/orchdaemon.cpp | 8 + orchagent/orchdaemon.h | 1 + orchagent/saihelper.cpp | 4 +- tests/mock_tests/Makefile.am | 2 + tests/mock_tests/dashhafloworch_ut.cpp | 753 +++++++++++++++++ tests/mock_tests/mock_orchagent_main.h | 1 + tests/mock_tests/ut_saihelper.cpp | 2 + 12 files changed, 2011 insertions(+), 1 deletion(-) create mode 100644 orchagent/dash/dashhafloworch.cpp create mode 100644 orchagent/dash/dashhafloworch.h create mode 100644 tests/mock_tests/dashhafloworch_ut.cpp diff --git a/orchagent/Makefile.am b/orchagent/Makefile.am index a504f41ab2c..1a524031aae 100644 --- a/orchagent/Makefile.am +++ b/orchagent/Makefile.am @@ -129,6 +129,7 @@ orchagent_SOURCES = \ dash/dashtunnelorch.cpp \ dash/pbutils.cpp \ dash/dashhaorch.cpp \ + dash/dashhafloworch.cpp \ dash/dashportmaporch.cpp \ twamporch.cpp \ stporch.cpp \ diff --git a/orchagent/dash/dashhafloworch.cpp b/orchagent/dash/dashhafloworch.cpp new file mode 100644 index 00000000000..56696fc6236 --- /dev/null +++ b/orchagent/dash/dashhafloworch.cpp @@ -0,0 +1,1047 @@ +#include "dashhafloworch.h" + +#include "orch.h" +#include "sai.h" +#include "saiextensions.h" +#include "saihelper.h" +#include "table.h" +#include "taskworker.h" +#include "converter.h" +#include "ipaddress.h" +#include "macaddress.h" +#include "swssnet.h" +#include "schema.h" +#include "schema.h" + +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace swss; + +extern sai_object_id_t gSwitchId; +extern sai_switch_api_t* sai_switch_api; +extern sai_dash_flow_api_t* sai_dash_flow_api; + +constexpr const char* DashHaFlowOrch::SESSION_TYPE_BULK_SYNC; +constexpr const char* DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP; + +static const map filter_key_map = { + { "eni_addr", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_ENI_ADDR }, + { "ip_protocol", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_IP_PROTOCOL }, + { "src_ip_addr", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_SRC_IP_ADDR }, + { "dst_ip_addr", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_DST_IP_ADDR }, + { "src_l4_port", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_SRC_L4_PORT }, + { "dst_l4_port", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_DST_L4_PORT }, + { "key_version", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_KEY_VERSION } +}; + +static const map filter_op_map = { + { "equal_to", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_OP_KEY_FILTER_OP_EQUAL_TO }, + { "greater_than", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_OP_KEY_FILTER_OP_GREATER_THAN }, + { "greater_than_or_equal_to", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_OP_KEY_FILTER_OP_GREATER_THAN_OR_EQUAL_TO }, + { "less_than", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_OP_KEY_FILTER_OP_LESS_THAN }, + { "less_than_or_equal_to", SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_OP_KEY_FILTER_OP_LESS_THAN_OR_EQUAL_TO } +}; + +task_process_status FlowDumpFilterManager::addFilter(const string &key, const vector &attrs) +{ + SWSS_LOG_ENTER(); + + FlowDumpFilterEntry entry; + entry.filter_id = SAI_NULL_OBJECT_ID; + + for (auto i = attrs.begin(); i != attrs.end(); i++) + { + const auto &attr = fvField(*i); + const auto &value = fvValue(*i); + + if (attr == "key") + { + entry.key = value; + } + else if (attr == "op") + { + entry.op = value; + } + else if (attr == "value") + { + entry.value = value; + } + } + + if (entry.key.empty() || entry.op.empty() || entry.value.empty()) + { + SWSS_LOG_ERROR("Missing required fields for flow dump filter %s", key.c_str()); + return task_failed; + } + + sai_object_id_t filter_id = createFilterSAI(entry); + if (filter_id == SAI_NULL_OBJECT_ID) + { + SWSS_LOG_ERROR("Failed to create flow dump filter %s", key.c_str()); + return task_failed; + } + + entry.filter_id = filter_id; + m_filter_cache[key] = entry; + + SWSS_LOG_NOTICE("Created flow dump filter %s with filter_id 0x%lx", key.c_str(), filter_id); + + return task_success; +} + +task_process_status FlowDumpFilterManager::removeFilter(const string &key) +{ + SWSS_LOG_ENTER(); + + auto it = m_filter_cache.find(key); + if (it == m_filter_cache.end()) + { + SWSS_LOG_WARN("Flow dump filter %s not found in cache", key.c_str()); + return task_success; + } + + if (it->second.filter_id != SAI_NULL_OBJECT_ID) + { + deleteFilterSAI(it->second.filter_id); + } + + m_filter_cache.erase(it); + + SWSS_LOG_NOTICE("Removed flow dump filter %s from cache", key.c_str()); + + return task_success; +} + +vector FlowDumpFilterManager::getFilterIds(const vector &required_filter_keys) const +{ + SWSS_LOG_ENTER(); + + vector filter_ids; + + for (const auto &filter_key : required_filter_keys) + { + auto filter_it = m_filter_cache.find(filter_key); + if (filter_it != m_filter_cache.end() && filter_it->second.filter_id != SAI_NULL_OBJECT_ID) + { + filter_ids.push_back(filter_it->second.filter_id); + } + } + + return filter_ids; +} + +sai_object_id_t FlowDumpFilterManager::createFilterSAI(const FlowDumpFilterEntry &filter) +{ + SWSS_LOG_ENTER(); + + try + { + sai_attribute_t attrs[3]; + uint32_t attr_count = 0; + + auto filter_key_it = filter_key_map.find(filter.key); + if (filter_key_it == filter_key_map.end()) + { + SWSS_LOG_ERROR("Invalid filter key: %s", filter.key.c_str()); + return SAI_NULL_OBJECT_ID; + } + sai_dash_flow_entry_bulk_get_session_filter_key_t filter_key = filter_key_it->second; + + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ATTR_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY; + attrs[attr_count].value.s32 = filter_key; + attr_count++; + + auto filter_op_it = filter_op_map.find(filter.op); + if (filter_op_it == filter_op_map.end()) + { + SWSS_LOG_ERROR("Invalid filter op: %s", filter.op.c_str()); + return SAI_NULL_OBJECT_ID; + } + sai_dash_flow_entry_bulk_get_session_op_key_t filter_op = filter_op_it->second; + + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ATTR_DASH_FLOW_ENTRY_BULK_GET_SESSION_OP_KEY; + attrs[attr_count].value.s32 = filter_op; + attr_count++; + + if (filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_ENI_ADDR) + { + MacAddress mac(filter.value); + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ATTR_MAC_VALUE; + memcpy(attrs[attr_count].value.mac, mac.getMac(), 6); + attr_count++; + } + else if (filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_IP_PROTOCOL || + filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_SRC_L4_PORT || + filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_DST_L4_PORT || + filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_KEY_VERSION) + { + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ATTR_INT_VALUE; + attrs[attr_count].value.u32 = static_cast(stoul(filter.value)); + attr_count++; + } + else if (filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_SRC_IP_ADDR || + filter_key == SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_KEY_DST_IP_ADDR) + { + IpAddress ip(filter.value); + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ATTR_IP_VALUE; + swss::copy(attrs[attr_count].value.ipaddr, ip); + attr_count++; + } + + sai_object_id_t filter_id = SAI_NULL_OBJECT_ID; + sai_status_t status = sai_dash_flow_api->create_flow_entry_bulk_get_session_filter(&filter_id, gSwitchId, attr_count, attrs); + + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to create flow bulk get session filter, status: %d", status); + return SAI_NULL_OBJECT_ID; + } + + return filter_id; + } + catch (const exception &e) + { + SWSS_LOG_ERROR("Exception in FlowDumpFilterManager::createFilterSAI for filter key %s, op %s, value %s: %s", + filter.key.c_str(), filter.op.c_str(), filter.value.c_str(), e.what()); + return SAI_NULL_OBJECT_ID; + } + catch (...) + { + SWSS_LOG_ERROR("Unknown exception in FlowDumpFilterManager::createFilterSAI for filter key %s, op %s, value %s", + filter.key.c_str(), filter.op.c_str(), filter.value.c_str()); + return SAI_NULL_OBJECT_ID; + } +} + +bool FlowDumpFilterManager::deleteFilterSAI(sai_object_id_t filter_id) +{ + SWSS_LOG_ENTER(); + + sai_status_t status = sai_dash_flow_api->remove_flow_entry_bulk_get_session_filter(filter_id); + + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to delete flow bulk get session filter 0x%lx, status: %d", filter_id, status); + return false; + } + + return true; +} + +FlowSyncHandler::FlowSyncHandler(DBConnector *dpu_state_db, SelectableTimer *timer) : + m_session_id(SAI_NULL_OBJECT_ID), + m_timer(timer) +{ + m_state_table = make_shared(dpu_state_db, STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); +} + +void FlowSyncHandler::deleteSession() +{ + SWSS_LOG_ENTER(); + + if (m_timer != nullptr) + { + m_timer->stop(); + } + + if (m_session_id != SAI_NULL_OBJECT_ID) + { + if (deleteSessionSAI()) + { + m_session_id = SAI_NULL_OBJECT_ID; + } + } +} + +bool FlowSyncHandler::deleteSessionSAI() +{ + SWSS_LOG_ENTER(); + + sai_status_t status = sai_dash_flow_api->remove_flow_entry_bulk_get_session(m_session_id); + + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to delete flow bulk get session 0x%lx, status: %d", m_session_id, status); + return false; + } + + return true; +} + +void FlowSyncHandler::updateState(const string &state, const string &key, vector fvs) +{ + chrono::steady_clock::time_point creation_time; + chrono::steady_clock::time_point last_state_time = chrono::steady_clock::now(); + + // Only update internal state if updating for the current session key + if (key == m_key) + { + m_last_state_time = last_state_time; + creation_time = m_creation_time; + } + else + { + // For different key, use current time for timestamps + creation_time = chrono::steady_clock::now(); + } + + auto creation_time_ms = chrono::duration_cast(creation_time.time_since_epoch()).count(); + auto last_state_time_ms = chrono::duration_cast(last_state_time.time_since_epoch()).count(); + + fvs.push_back(FieldValueTuple("state", state)); + fvs.push_back(FieldValueTuple("creation_time_in_ms", to_string(creation_time_ms))); + fvs.push_back(FieldValueTuple("last_state_start_time_in_ms", to_string(last_state_time_ms))); + + m_state_table->set(key, fvs); +} + +BulkSyncHandler::BulkSyncHandler(DBConnector *dpu_state_db, SelectableTimer *timer) : + FlowSyncHandler(dpu_state_db, timer) +{ +} + +bool BulkSyncHandler::initialize(const string &key, const vector &attrs) +{ + SWSS_LOG_ENTER(); + + if (isActive()) + { + SWSS_LOG_ERROR("BulkSyncHandler already active: %s. Cannot create new session: %s", m_key.c_str(), key.c_str()); + return false; + } + + try + { + // m_key is already set in handleSet + m_ha_set_id = SAI_NULL_OBJECT_ID; + m_target_server_ip = ""; + m_target_server_port = 0; + m_timeout_sec = DEFAULT_TIMEOUT_SEC; + + for (auto i = attrs.begin(); i != attrs.end(); i++) + { + const auto &attr = fvField(*i); + const auto &value = fvValue(*i); + + if (attr == "target_server_ip") + { + m_target_server_ip = value; + } + else if (attr == "target_server_port") + { + m_target_server_port = static_cast(stoul(value)); + } + else if (attr == "timeout") + { + m_timeout_sec = static_cast(stoul(value)); + } + } + return true; + } + catch (const exception &e) + { + SWSS_LOG_ERROR("Exception in BulkSyncHandler::initialize for key %s: %s", key.c_str(), e.what()); + return false; + } + catch (...) + { + SWSS_LOG_ERROR("Unknown exception in BulkSyncHandler::initialize for key %s", key.c_str()); + return false; + } +} + +void BulkSyncHandler::reset() +{ + SWSS_LOG_ENTER(); + m_key = ""; + m_ha_set_id = SAI_NULL_OBJECT_ID; + m_target_server_ip = ""; + m_target_server_port = 0; + m_timeout_sec = BulkSyncHandler::DEFAULT_TIMEOUT_SEC; +} + +task_process_status BulkSyncHandler::handleSet(const string &table_name, const string &key, const vector &attrs) +{ + SWSS_LOG_ENTER(); + + if (table_name != APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME) + { + SWSS_LOG_ERROR("BulkSyncHandler::handleSet called with unknown table: %s", table_name.c_str()); + return task_failed; + } + + if (isActive()) + { + SWSS_LOG_ERROR("Flow sync session already exists: %s. Cannot create new session: %s", m_key.c_str(), key.c_str()); + vector fvs; + fvs.push_back(FieldValueTuple("type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC)); + FlowSyncHandler::updateState("failed", key, fvs); + return task_failed; + } + + // Set key before initialize so state updates work correctly + m_key = key; + if (!initialize(key, attrs)) + { + SWSS_LOG_ERROR("Failed to initialize BulkSyncHandler for key %s", key.c_str()); + vector fvs; + fvs.push_back(FieldValueTuple("type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC)); + FlowSyncHandler::updateState("failed", key, fvs); + clearKey(); + return task_failed; + } + + task_process_status status = createSession(); + if (status != task_success && status != task_need_retry) + { + clearKey(); + } + + return status; +} + +task_process_status BulkSyncHandler::handleDel(const string &table_name, const string &key) +{ + SWSS_LOG_ENTER(); + SWSS_LOG_WARN("Deleting session %s is not supported, will be auto deleted when session is finished", key.c_str()); + return task_failed; +} + +BulkSyncHandler::~BulkSyncHandler() +{ + deleteSession(); +} + +task_process_status BulkSyncHandler::createSession() +{ + SWSS_LOG_ENTER(); + + if (m_target_server_ip.empty()) + { + SWSS_LOG_ERROR("Missing target_server_ip for flow sync session %s", m_key.c_str()); + FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + return task_failed; + } + + if (m_target_server_port == 0) + { + SWSS_LOG_ERROR("Missing or invalid target_server_port for flow sync session %s", m_key.c_str()); + FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + return task_failed; + } + + sai_object_id_t session_id = createSessionSAI(); + if (session_id == SAI_NULL_OBJECT_ID) + { + SWSS_LOG_ERROR("Failed to create flow sync session %s", m_key.c_str()); + FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + return task_failed; + } + + m_session_id = session_id; + m_creation_time = chrono::steady_clock::now(); + m_last_state_time = m_creation_time; + + auto interval = timespec { .tv_sec = static_cast(m_timeout_sec), .tv_nsec = 0 }; + m_timer->setInterval(interval); + m_timer->start(); + + FlowSyncHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + SWSS_LOG_NOTICE("Created flow sync session %s with session_id 0x%lx, timeout %u sec", m_key.c_str(), session_id, m_timeout_sec); + + return task_success; +} + +void BulkSyncHandler::handleFinished() +{ + SWSS_LOG_ENTER(); + FlowSyncHandler::updateState("completed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + SWSS_LOG_NOTICE("Flow sync session %s completed successfully", m_key.c_str()); + deleteSession(); + reset(); +} + +void BulkSyncHandler::handleTimeout() +{ + SWSS_LOG_ENTER(); + FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + SWSS_LOG_WARN("Flow sync session %s timed out", m_key.c_str()); + deleteSession(); + reset(); +} + +sai_object_id_t BulkSyncHandler::createSessionSAI() +{ + SWSS_LOG_ENTER(); + + sai_attribute_t attrs[4]; + uint32_t attr_count = 0; + + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE; + attrs[attr_count].value.s32 = SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE_SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE_VENDOR; + attr_count++; + + IpAddress server_ip(m_target_server_ip); + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_BULK_GET_SESSION_SERVER_IP; + swss::copy(attrs[attr_count].value.ipaddr, server_ip); + attr_count++; + + attrs[attr_count].id = SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_BULK_GET_SESSION_SERVER_PORT; + attrs[attr_count].value.u16 = m_target_server_port; + attr_count++; + + sai_object_id_t session_id = SAI_NULL_OBJECT_ID; + sai_status_t status = sai_dash_flow_api->create_flow_entry_bulk_get_session(&session_id, gSwitchId, attr_count, attrs); + + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to create flow sync session, status: %d", status); + return SAI_NULL_OBJECT_ID; + } + + return session_id; +} + +FlowDumpHandler::FlowDumpHandler(DBConnector *dpu_state_db, SelectableTimer *timer, std::shared_ptr filter_manager) : + FlowSyncHandler(dpu_state_db, timer), + m_filter_manager(filter_manager) +{ +} + +bool FlowDumpHandler::initialize(const string &key, const vector &attrs) +{ + SWSS_LOG_ENTER(); + + try + { + // m_key is already set in handleSet + m_flow_state = false; + m_max_flows = FlowDumpHandler::MAX_FLOWS_DEFAULT; + m_timeout_sec = FlowDumpHandler::DEFAULT_TIMEOUT_SEC; + m_output_file = ""; + m_required_filter_keys.clear(); + + for (auto i = attrs.begin(); i != attrs.end(); i++) + { + const auto &attr = fvField(*i); + const auto &value = fvValue(*i); + + if (attr == "flow_state") + { + string lower_value = value; + transform(lower_value.begin(), lower_value.end(), lower_value.begin(), ::tolower); + m_flow_state = (lower_value == "true"); + } + else if (attr == "filter_1" || attr == "filter_2" || attr == "filter_3" || attr == "filter_4" || attr == "filter_5") + { + m_required_filter_keys.push_back(value); + } + else if (attr == "max_flows") + { + m_max_flows = static_cast(stoul(value)); + } + else if (attr == "timeout") + { + m_timeout_sec = static_cast(stoul(value)); + } + } + return true; + } + catch (const exception &e) + { + SWSS_LOG_ERROR("Exception in FlowDumpHandler::initialize for key %s: %s", key.c_str(), e.what()); + return false; + } + catch (...) + { + SWSS_LOG_ERROR("Unknown exception in FlowDumpHandler::initialize for key %s", key.c_str()); + return false; + } +} + +void FlowDumpHandler::reset() +{ + SWSS_LOG_ENTER(); + + m_key = ""; + m_flow_state = true; + m_required_filter_keys.clear(); + m_max_flows = 0; + m_timeout_sec = DEFAULT_TIMEOUT_SEC; + m_output_file = ""; +} + +FlowDumpHandler::~FlowDumpHandler() +{ + deleteSession(); +} + +task_process_status FlowDumpHandler::createSession() +{ + SWSS_LOG_ENTER(); + + vector filter_ids = m_filter_manager->getFilterIds(m_required_filter_keys); + + if (filter_ids.size() != m_required_filter_keys.size()) + { + SWSS_LOG_INFO("Flow dump session %s waiting for filters to become available (%zu/%zu)", m_key.c_str(), filter_ids.size(), m_required_filter_keys.size()); + FlowSyncHandler::updateState("pending", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + return task_need_retry; + } + + sai_object_id_t session_id = createSessionSAI(); + if (session_id == SAI_NULL_OBJECT_ID) + { + SWSS_LOG_ERROR("Failed to create flow dump session %s", m_key.c_str()); + FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + return task_failed; + } + + m_session_id = session_id; + + auto interval = timespec { .tv_sec = static_cast(m_timeout_sec), .tv_nsec = 0 }; + m_timer->setInterval(interval); + m_timer->start(); + + FlowSyncHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + + SWSS_LOG_NOTICE("Created flow dump session %s with session_id 0x%lx, timeout %u sec", m_key.c_str(), session_id, m_timeout_sec); + + return task_success; +} + + +task_process_status FlowDumpHandler::handleSet(const string &table_name, const string &key, const vector &attrs) +{ + SWSS_LOG_ENTER(); + + if (table_name == APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME) + { + if (isActive()) + { + SWSS_LOG_ERROR("Flow dump session already exists: %s. Cannot create new session: %s", m_key.c_str(), key.c_str()); + FlowSyncHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + return task_failed; + } + + // Set key before initialize so state updates work correctly + m_key = key; + if (!initialize(key, attrs)) + { + SWSS_LOG_ERROR("Failed to initialize FlowDumpHandler for key %s", key.c_str()); + FlowSyncHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + clearKey(); + return task_failed; + } + + task_process_status status = createSession(); + if (status != task_success && status != task_need_retry) + { + clearKey(); + } + return status; + } + else + { + SWSS_LOG_ERROR("FlowDumpHandler::handleSet called with unknown table: %s", table_name.c_str()); + return task_failed; + } +} + +task_process_status FlowDumpHandler::handleDel(const string &table_name, const string &key) +{ + SWSS_LOG_ENTER(); + + if (table_name == APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME) + { + SWSS_LOG_WARN("Deleting session %s is not supported, will be auto deleted when session is finished", key.c_str()); + return task_failed; + } + else + { + SWSS_LOG_ERROR("FlowDumpHandler::handleDel called with unknown table: %s", table_name.c_str()); + return task_failed; + } +} + + +void FlowDumpHandler::handleFinished() +{ + SWSS_LOG_ENTER(); + + ostringstream oss; + oss << "/var/dump/flows/flow_dump_0x" << hex << setfill('0') << setw(16) << m_session_id << ".jsonl.gz"; + m_output_file = oss.str(); + + vector fvs; + fvs.push_back(FieldValueTuple("type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP)); + fvs.push_back(FieldValueTuple("output_file", m_output_file)); + FlowSyncHandler::updateState("completed", m_key, fvs); + SWSS_LOG_NOTICE("Flow dump session %s completed successfully, output file: %s", m_key.c_str(), m_output_file.c_str()); + + deleteSession(); + reset(); +} + +void FlowDumpHandler::handleTimeout() +{ + SWSS_LOG_ENTER(); + + FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + deleteSession(); + reset(); + + SWSS_LOG_WARN("Flow dump session %s timed out", m_key.c_str()); +} + +sai_object_id_t FlowDumpHandler::createSessionSAI() +{ + SWSS_LOG_ENTER(); + + vector attrs; + + sai_dash_flow_entry_bulk_get_session_mode_t mode; + if (m_flow_state) + { + mode = SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE_SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE_EVENT; + } + else + { + mode = SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE_SAI_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE_EVENT_WITHOUT_FLOW_STATE; + } + + sai_attribute_t attr; + attr.id = SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_DASH_FLOW_ENTRY_BULK_GET_SESSION_MODE; + attr.value.s32 = mode; + attrs.push_back(attr); + + if (m_max_flows > 0) + { + attr.id = SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_BULK_GET_ENTRY_LIMITATION; + attr.value.u32 = m_max_flows; + attrs.push_back(attr); + } + + sai_attr_id_t filter_attr_ids[] = { + SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_FIRST_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ID, + SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_SECOND_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ID, + SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_THIRD_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ID, + SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_FOURTH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ID, + SAI_FLOW_ENTRY_BULK_GET_SESSION_ATTR_FIFTH_FLOW_ENTRY_BULK_GET_SESSION_FILTER_ID + }; + + vector filter_ids = m_filter_manager->getFilterIds(m_required_filter_keys); + for (size_t i = 0; i < filter_ids.size() && i < 5; i++) + { + attr.id = filter_attr_ids[i]; + attr.value.oid = filter_ids[i]; + attrs.push_back(attr); + } + + sai_object_id_t session_id = SAI_NULL_OBJECT_ID; + sai_status_t status = sai_dash_flow_api->create_flow_entry_bulk_get_session(&session_id, gSwitchId, static_cast(attrs.size()), attrs.data()); + + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to create flow dump session, status: %d", status); + return SAI_NULL_OBJECT_ID; + } + + return session_id; +} + +DashHaFlowOrch::DashHaFlowOrch(DBConnector *db, const vector &tableNames, DBConnector *app_state_db, ZmqServer *zmqServer) : + ZmqOrch(db, tableNames, zmqServer) +{ + SWSS_LOG_ENTER(); + + m_dpuStateDb = make_unique("DPU_STATE_DB", 0, true); + + auto sync_interval = timespec { .tv_sec = 0, .tv_nsec = 0 }; + m_sync_timer = new SelectableTimer(sync_interval); + m_sync_executor = new ExecutableTimer(m_sync_timer, this, "FLOW_SYNC_SESSION_TIMER"); + Orch::addExecutor(m_sync_executor); + + auto dump_interval = timespec { .tv_sec = 0, .tv_nsec = 0 }; + m_dump_timer = new SelectableTimer(dump_interval); + m_dump_executor = new ExecutableTimer(m_dump_timer, this, "FLOW_DUMP_SESSION_TIMER"); + Orch::addExecutor(m_dump_executor); + + m_filter_manager = std::make_shared(); + m_handlers[SESSION_TYPE_BULK_SYNC] = make_shared(m_dpuStateDb.get(), m_sync_timer); + m_handlers[SESSION_TYPE_FLOW_DUMP] = make_shared(m_dpuStateDb.get(), m_dump_timer, m_filter_manager); + + DBConnector *notificationsDb = new DBConnector("ASIC_DB", 0); + m_flowBulkGetSessionNotificationConsumer = new NotificationConsumer(notificationsDb, "NOTIFICATIONS"); + auto flowBulkGetSessionNotifier = new Notifier(m_flowBulkGetSessionNotificationConsumer, this, SAI_SWITCH_NOTIFICATION_NAME_FLOW_BULK_GET_SESSION_EVENT); + + Orch::addExecutor(flowBulkGetSessionNotifier); + + registerFlowBulkGetSessionNotifier(); +} + +bool DashHaFlowOrch::registerFlowBulkGetSessionNotifier() +{ + SWSS_LOG_ENTER(); + + sai_attribute_t attr; + sai_status_t status; + sai_attr_capability_t capability; + + status = sai_query_attribute_capability(gSwitchId, SAI_OBJECT_TYPE_SWITCH, + SAI_SWITCH_ATTR_FLOW_BULK_GET_SESSION_EVENT_NOTIFY, + &capability); + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Unable to query the Flow Bulk Get Session event notification capability"); + return false; + } + + if (!capability.set_implemented) + { + SWSS_LOG_INFO("Flow Bulk Get Session event notification not supported"); + return false; + } + + attr.id = SAI_SWITCH_ATTR_FLOW_BULK_GET_SESSION_EVENT_NOTIFY; + attr.value.ptr = (void *)on_flow_bulk_get_session_event; + + status = sai_switch_api->set_switch_attribute(gSwitchId, &attr); + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to register Flow Bulk Get Session event notification"); + return false; + } + + return true; +} + +void DashHaFlowOrch::doTask(ConsumerBase &consumer) +{ + SWSS_LOG_ENTER(); + + const auto& tn = consumer.getTableName(); + + if (tn == APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME) + { + doTaskFlowSyncSessionTable(consumer); + } + else if (tn == APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME) + { + doTaskFlowDumpFilterTable(consumer); + } + else + { + SWSS_LOG_ERROR("Unknown table: %s", tn.c_str()); + } +} + +void DashHaFlowOrch::doTask(NotificationConsumer &consumer) +{ + SWSS_LOG_ENTER(); + + std::string notification_name; + std::string data; + std::vector values; + + consumer.pop(notification_name, data, values); + + if (notification_name == SAI_SWITCH_NOTIFICATION_NAME_FLOW_BULK_GET_SESSION_EVENT) + { + handleSessionNotification(notification_name, data, values); + } + else + { + SWSS_LOG_WARN("Unknown notification: %s", notification_name.c_str()); + } +} + +void DashHaFlowOrch::doTask(SelectableTimer &timer) +{ + SWSS_LOG_ENTER(); + + handleTimerExpired(&timer); +} + +void DashHaFlowOrch::doTaskFlowSyncSessionTable(ConsumerBase &consumer) +{ + SWSS_LOG_ENTER(); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + auto t = it->second; + string key = kfvKey(t); + string op = kfvOp(t); + task_process_status status = task_failed; + + if (op == SET_COMMAND) + { + string type = getTypeFromAttrs(kfvFieldsValues(t)); + auto h_it = m_handlers.find(type); + if (h_it == m_handlers.end()) + { + SWSS_LOG_ERROR("Invalid or missing type field in session %s. Expected '%s' or '%s'", key.c_str(), SESSION_TYPE_BULK_SYNC, SESSION_TYPE_FLOW_DUMP); + status = task_failed; + } + else + { + status = h_it->second->handleSet(APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, key, kfvFieldsValues(t)); + } + } + else if (op == DEL_COMMAND) + { + // Find which handler has this key + bool found = false; + for (auto &h_pair : m_handlers) + { + if (h_pair.second->getKey() == key) + { + status = h_pair.second->handleDel(APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, key); + found = true; + break; + } + } + if (!found) + { + SWSS_LOG_WARN("Session %s not found in any handler", key.c_str()); + status = task_success; + } + } + else + { + SWSS_LOG_ERROR("Unknown operation %s", op.c_str()); + } + + if (status == task_need_retry) + { + it++; + } + else + { + it = consumer.m_toSync.erase(it); + } + } +} + +void DashHaFlowOrch::doTaskFlowDumpFilterTable(ConsumerBase &consumer) +{ + SWSS_LOG_ENTER(); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + auto t = it->second; + string key = kfvKey(t); + string op = kfvOp(t); + task_process_status status = task_failed; + + if (op == SET_COMMAND) + { + status = m_filter_manager->addFilter(key, kfvFieldsValues(t)); + } + else if (op == DEL_COMMAND) + { + status = m_filter_manager->removeFilter(key); + } + else + { + SWSS_LOG_ERROR("Unknown operation %s", op.c_str()); + } + + if (status == task_need_retry) + { + it++; + } + else + { + it = consumer.m_toSync.erase(it); + } + } +} + +string DashHaFlowOrch::getTypeFromAttrs(const vector &attrs) +{ + SWSS_LOG_ENTER(); + + for (const auto &attr : attrs) + { + if (fvField(attr) == "type") + { + return fvValue(attr); + } + } + + return ""; +} + +void DashHaFlowOrch::handleSessionNotification(const string ¬ification_name, const string &data, const vector &values) +{ + SWSS_LOG_ENTER(); + + sai_object_id_t flow_bulk_session_id; + uint32_t count; + sai_flow_bulk_get_session_event_data_t* event_data; + + sai_deserialize_flow_bulk_get_session_event_ntf(data, flow_bulk_session_id, count, &event_data); + + for (uint32_t i = 0; i < count; i++) + { + if (event_data[i].event_type == SAI_FLOW_BULK_GET_SESSION_EVENT_FINISHED) + { + handleSessionFinished(flow_bulk_session_id); + } + } + + sai_deserialize_free_flow_bulk_get_session_event_ntf(count, event_data); +} + +void DashHaFlowOrch::handleSessionFinished(sai_object_id_t session_id) +{ + SWSS_LOG_ENTER(); + + bool found = false; + for (auto &h_pair : m_handlers) + { + if (h_pair.second->getSessionId() == session_id) + { + h_pair.second->handleFinished(); + SWSS_LOG_NOTICE("Session ID 0x%lx finished (type: %s)", session_id, h_pair.first.c_str()); + found = true; + break; + } + } + + if (!found) + { + SWSS_LOG_WARN("Received FINISHED notification for unknown session 0x%lx", session_id); + } +} + +void DashHaFlowOrch::handleTimerExpired(SelectableTimer *timer) +{ + SWSS_LOG_ENTER(); + + bool found = false; + for (auto &h_pair : m_handlers) + { + if (h_pair.second->getTimer() == timer) + { + h_pair.second->handleTimeout(); + SWSS_LOG_NOTICE("Timer expired for handler (type: %s)", h_pair.first.c_str()); + found = true; + break; + } + } + + if (!found) + { + SWSS_LOG_WARN("Timer not found in any handler"); + } +} diff --git a/orchagent/dash/dashhafloworch.h b/orchagent/dash/dashhafloworch.h new file mode 100644 index 00000000000..4d348aa4117 --- /dev/null +++ b/orchagent/dash/dashhafloworch.h @@ -0,0 +1,178 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "dbconnector.h" +#include "zmqorch.h" +#include "zmqserver.h" +#include "saitypes.h" +#include "notifier.h" +#include "directory.h" +#include "sai_serialize.h" +#include "notifications.h" +#include "timer.h" +#include "orch.h" + +extern sai_dash_flow_api_t* sai_dash_flow_api; +extern sai_object_id_t gSwitchId; + +class DashHaFlowOrch; + +class FlowDumpFilterManager +{ + friend class DashHaFlowOrch; + + struct FlowDumpFilterEntry + { + sai_object_id_t filter_id; + std::string key; + std::string op; + std::string value; + }; + +public: + FlowDumpFilterManager() = default; + ~FlowDumpFilterManager() = default; + + std::vector getFilterIds(const std::vector &required_filter_keys) const; + +protected: + task_process_status addFilter(const std::string &key, const std::vector &attrs); + task_process_status removeFilter(const std::string &key); + +private: + sai_object_id_t createFilterSAI(const FlowDumpFilterEntry &filter); + bool deleteFilterSAI(sai_object_id_t filter_id); + + std::map m_filter_cache; +}; + +class FlowSyncHandler +{ +public: + FlowSyncHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer); + virtual ~FlowSyncHandler() = default; + + virtual bool initialize(const std::string &key, const std::vector &attrs) = 0; + virtual task_process_status handleSet(const std::string &table_name, const std::string &key, const std::vector &attrs) = 0; + virtual task_process_status handleDel(const std::string &table_name, const std::string &key) = 0; + + virtual void handleFinished() = 0; + virtual void handleTimeout() = 0; + virtual sai_object_id_t getSessionId() const { return m_session_id; } + virtual std::string getKey() const { return m_key; } + virtual bool isActive() const { return m_session_id != SAI_NULL_OBJECT_ID; } + virtual swss::SelectableTimer* getTimer() const { return m_timer; } + virtual void clearKey() { m_key = ""; } + +protected: + virtual task_process_status createSession() = 0; + virtual sai_object_id_t createSessionSAI() = 0; + virtual void reset() = 0; + + void deleteSession(); + bool deleteSessionSAI(); + void updateState(const std::string &state, const std::string &key, std::vector fvs); + + std::string m_key; + sai_object_id_t m_session_id; + swss::SelectableTimer* m_timer; + std::chrono::steady_clock::time_point m_creation_time; + std::chrono::steady_clock::time_point m_last_state_time; + std::shared_ptr m_state_table; +}; + +class BulkSyncHandler : public FlowSyncHandler +{ +public: + BulkSyncHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer); + virtual ~BulkSyncHandler(); + + bool initialize(const std::string &key, const std::vector &attrs) override; + task_process_status handleSet(const std::string &table_name, const std::string &key, const std::vector &attrs) override; + task_process_status handleDel(const std::string &table_name, const std::string &key) override; + void handleFinished() override; + void handleTimeout() override; + +protected: + task_process_status createSession() override; + sai_object_id_t createSessionSAI() override; + void reset() override; + +private: + sai_object_id_t m_ha_set_id; + std::string m_target_server_ip; + uint16_t m_target_server_port; + uint32_t m_timeout_sec; + + static constexpr uint32_t DEFAULT_TIMEOUT_SEC = 120; +}; + +class FlowDumpHandler : public FlowSyncHandler +{ +public: + FlowDumpHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer, std::shared_ptr filter_manager); + virtual ~FlowDumpHandler(); + + bool initialize(const std::string &key, const std::vector &attrs) override; + task_process_status handleSet(const std::string &table_name, const std::string &key, const std::vector &attrs) override; + task_process_status handleDel(const std::string &table_name, const std::string &key) override; + void handleFinished() override; + void handleTimeout() override; + +protected: + task_process_status createSession() override; + sai_object_id_t createSessionSAI() override; + void reset() override; + + bool m_flow_state; + std::vector m_required_filter_keys; + uint32_t m_max_flows; + uint32_t m_timeout_sec; + std::string m_output_file; + +private: + std::shared_ptr m_filter_manager; + + static constexpr uint32_t DEFAULT_TIMEOUT_SEC = 300; + static constexpr uint32_t MAX_FLOWS_DEFAULT = 1000; +}; + +class DashHaFlowOrch : public ZmqOrch +{ +public: + static constexpr const char* SESSION_TYPE_BULK_SYNC = "bulk_sync"; + static constexpr const char* SESSION_TYPE_FLOW_DUMP = "flow_dump"; + + DashHaFlowOrch(swss::DBConnector *db, const std::vector &tableNames, swss::DBConnector *app_state_db, swss::ZmqServer *zmqServer); + +protected: + std::map> m_handlers; + + std::unique_ptr m_dpuStateDb; + swss::SelectableTimer* m_sync_timer; + swss::SelectableTimer* m_dump_timer; + swss::ExecutableTimer* m_sync_executor; + swss::ExecutableTimer* m_dump_executor; + + swss::NotificationConsumer* m_flowBulkGetSessionNotificationConsumer; + std::shared_ptr m_filter_manager; + + void doTask(ConsumerBase &consumer); + void doTask(swss::NotificationConsumer &consumer); + void doTask(swss::SelectableTimer &timer); + void doTaskFlowSyncSessionTable(ConsumerBase &consumer); + void doTaskFlowDumpFilterTable(ConsumerBase &consumer); + + std::string getTypeFromAttrs(const std::vector &attrs); + + void handleSessionNotification(const std::string ¬ification_name, const std::string &data, const std::vector &values); + void handleSessionFinished(sai_object_id_t session_id); + void handleTimerExpired(swss::SelectableTimer *timer); + + bool registerFlowBulkGetSessionNotifier(); +}; diff --git a/orchagent/notifications.cpp b/orchagent/notifications.cpp index 978c702e1e4..96c79fd1dfd 100644 --- a/orchagent/notifications.cpp +++ b/orchagent/notifications.cpp @@ -80,6 +80,20 @@ void on_ha_scope_event(uint32_t count, sai_ha_scope_event_data_t *data) } } +void on_flow_bulk_get_session_event(sai_object_id_t flow_bulk_session_id, uint32_t count, sai_flow_bulk_get_session_event_data_t *data) +{ + if (gRedisCommunicationMode == SAI_REDIS_COMMUNICATION_MODE_ZMQ_SYNC) + { + swss::DBConnector db("ASIC_DB", 0); + swss::NotificationProducer flow_bulk_get_session_event(&db, "NOTIFICATIONS"); + std::string sdata = sai_serialize_flow_bulk_get_session_event_ntf(flow_bulk_session_id, count, data); + std::vector values; + + // Forward flow_bulk_get_session_event notification to be handled in orchagent doTask() + flow_bulk_get_session_event.send(SAI_SWITCH_NOTIFICATION_NAME_FLOW_BULK_GET_SESSION_EVENT, sdata, values); + } +} + void on_switch_shutdown_request(sai_object_id_t switch_id) { SWSS_LOG_ENTER(); diff --git a/orchagent/notifications.h b/orchagent/notifications.h index b81efe9c590..af511212880 100644 --- a/orchagent/notifications.h +++ b/orchagent/notifications.h @@ -11,6 +11,7 @@ void on_bfd_session_state_change(uint32_t count, sai_bfd_session_state_notificat void on_twamp_session_event(uint32_t count, sai_twamp_session_event_notification_data_t *data); void on_ha_set_event(uint32_t count, sai_ha_set_event_data_t *data); void on_ha_scope_event(uint32_t count, sai_ha_scope_event_data_t *data); +void on_flow_bulk_get_session_event(sai_object_id_t flow_bulk_session_id, uint32_t count, sai_flow_bulk_get_session_event_data_t *data); // The function prototype information can be found here: // https://github.com/sonic-net/sonic-sairedis/blob/master/meta/NotificationSwitchShutdownRequest.cpp#L49 diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 61646b8f4c0..4e49c616129 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -1360,6 +1360,13 @@ bool DpuOrchDaemon::init() DashPortMapOrch *dash_port_map_orch = new DashPortMapOrch(m_dpu_appDb, dash_port_map_tables, m_dpu_appstateDb, dash_zmq_server); gDirectory.set(dash_port_map_orch); + vector dash_ha_flow_tables = { + APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, + APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME + }; + DashHaFlowOrch *dash_ha_flow_orch = new DashHaFlowOrch(m_dpu_appDb, dash_ha_flow_tables, m_dpu_appstateDb, dash_zmq_server); + gDirectory.set(dash_ha_flow_orch); + addOrchList(dash_acl_orch); addOrchList(dash_vnet_orch); addOrchList(dash_route_orch); @@ -1368,6 +1375,7 @@ bool DpuOrchDaemon::init() addOrchList(dash_meter_orch); addOrchList(dash_ha_orch); addOrchList(dash_port_map_orch); + addOrchList(dash_ha_flow_orch); return true; } diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index 2041c6bfff3..e83d442b9eb 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -56,6 +56,7 @@ #include "dash/dashtunnelorch.h" #include "dash/dashvnetorch.h" #include "dash/dashhaorch.h" +#include "dash/dashhafloworch.h" #include "dash/dashmeterorch.h" #include "dash/dashportmaporch.h" #include "high_frequency_telemetry/hftelorch.h" diff --git a/orchagent/saihelper.cpp b/orchagent/saihelper.cpp index 40251d9019d..27d720b07aa 100644 --- a/orchagent/saihelper.cpp +++ b/orchagent/saihelper.cpp @@ -96,6 +96,7 @@ sai_stp_api_t* sai_stp_api; sai_dash_meter_api_t* sai_dash_meter_api; sai_dash_outbound_port_map_api_t* sai_dash_outbound_port_map_api; sai_dash_trusted_vni_api_t* sai_dash_trusted_vni_api; +sai_dash_flow_api_t* sai_dash_flow_api; extern sai_object_id_t gSwitchId; extern bool gTraditionalFlexCounter; @@ -255,7 +256,8 @@ void initSaiApi() sai_api_query((sai_api_t)SAI_API_DASH_TUNNEL, (void**)&sai_dash_tunnel_api); sai_api_query((sai_api_t)SAI_API_DASH_HA, (void**)&sai_dash_ha_api); sai_api_query((sai_api_t)SAI_API_DASH_OUTBOUND_PORT_MAP, (void**)&sai_dash_outbound_port_map_api); - sai_api_query((sai_api_t)SAI_API_DASH_TRUSTED_VNI, (void**)&sai_dash_trusted_vni_api); + sai_api_query((sai_api_t)SAI_API_DASH_TRUSTED_VNI, (void**)&sai_dash_trusted_vni_api); + sai_api_query((sai_api_t)SAI_API_DASH_FLOW, (void**)&sai_dash_flow_api); sai_api_query(SAI_API_TWAMP, (void **)&sai_twamp_api); sai_api_query(SAI_API_TAM, (void **)&sai_tam_api); sai_api_query(SAI_API_STP, (void **)&sai_stp_api); diff --git a/tests/mock_tests/Makefile.am b/tests/mock_tests/Makefile.am index 9f080fd4743..cf32d75c7a6 100644 --- a/tests/mock_tests/Makefile.am +++ b/tests/mock_tests/Makefile.am @@ -71,6 +71,7 @@ tests_SOURCES = aclorch_ut.cpp \ dashorch_ut.cpp \ dashvnetorch_ut.cpp \ dashhaorch_ut.cpp \ + dashhafloworch_ut.cpp \ dashrouteorch_ut.cpp \ dashportmaporch_ut.cpp \ twamporch_ut.cpp \ @@ -163,6 +164,7 @@ tests_SOURCES = aclorch_ut.cpp \ $(top_srcdir)/orchagent/dash/dashtunnelorch.cpp \ $(top_srcdir)/orchagent/dash/dashvnetorch.cpp \ $(top_srcdir)/orchagent/dash/dashhaorch.cpp \ + $(top_srcdir)/orchagent/dash/dashhafloworch.cpp \ $(top_srcdir)/orchagent/dash/dashmeterorch.cpp \ $(top_srcdir)/orchagent/dash/dashportmaporch.cpp \ $(top_srcdir)/cfgmgr/buffermgrdyn.cpp \ diff --git a/tests/mock_tests/dashhafloworch_ut.cpp b/tests/mock_tests/dashhafloworch_ut.cpp new file mode 100644 index 00000000000..a5bc593a39e --- /dev/null +++ b/tests/mock_tests/dashhafloworch_ut.cpp @@ -0,0 +1,753 @@ +#include "mock_orch_test.h" +#include "mock_table.h" +#include "mock_sai_api.h" +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "dash/dashhafloworch.h" +using namespace ::testing; +using ::testing::DoAll; +using ::testing::SetArgPointee; + +extern redisReply *mockReply; +extern sai_redis_communication_mode_t gRedisCommunicationMode; + +EXTERN_MOCK_FNS + +namespace dashhafloworch_ut +{ + DEFINE_SAI_GENERIC_APIS_MOCK(dash_flow, flow_entry_bulk_get_session, flow_entry_bulk_get_session_filter); + + using namespace mock_orch_test; + + class DashHaFlowOrchTestable : public DashHaFlowOrch + { + public: + DashHaFlowOrchTestable(swss::DBConnector *db, const std::vector &tableNames, swss::DBConnector *app_state_db, swss::ZmqServer *zmqServer) + : DashHaFlowOrch(db, tableNames, app_state_db, zmqServer) {} + void doTask(swss::NotificationConsumer &consumer) { DashHaFlowOrch::doTask(consumer); } + void doTask(swss::SelectableTimer &timer) { DashHaFlowOrch::doTask(timer); } + void handleSessionFinished(sai_object_id_t session_id) { DashHaFlowOrch::handleSessionFinished(session_id); } + void handleTimerExpired(swss::SelectableTimer *timer) { DashHaFlowOrch::handleTimerExpired(timer); } + swss::SelectableTimer* getSyncTimer() { return m_sync_timer; } + swss::SelectableTimer* getDumpTimer() { return m_dump_timer; } + }; + + class DashHaFlowOrchTest : public MockOrchTest + { + protected: + DashHaFlowOrchTestable *m_dashHaFlowOrch; + shared_ptr m_dpu_state_db; + + void PostSetUp() override + { + m_dpu_state_db = make_shared("DPU_STATE_DB", 0); + vector dash_ha_flow_tables = { + APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, + APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME + }; + m_dashHaFlowOrch = new DashHaFlowOrchTestable(m_dpu_app_db.get(), dash_ha_flow_tables, m_dpu_app_state_db.get(), nullptr); + gDirectory.set(m_dashHaFlowOrch); + ut_orch_list.push_back((Orch **)&m_dashHaFlowOrch); + } + + void ApplySaiMock() + { + INIT_SAI_API_MOCK(dash_flow); + MockSaiApis(); + } + + void PreTearDown() override + { + RestoreSaiApis(); + DEINIT_SAI_API_MOCK(dash_flow); + } + + void CreateFlowSyncSession(const string &key, const string &ha_set_id, const string &target_server_ip, const string &target_server_port, const string &timeout = "120") + { + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME)); + + consumer->addToSync( + deque( + { + { + key, + SET_COMMAND, + { + {"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}, + {"ha_set_id", ha_set_id}, + {"target_server_ip", target_server_ip}, + {"target_server_port", target_server_port}, + {"timeout", timeout} + } + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + } + + void RemoveFlowSyncSession(const string &key) + { + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME)); + + consumer->addToSync( + deque( + { + { + key, + DEL_COMMAND, + { } + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + } + + void CreateFlowDumpFilter(const string &key, const string &filter_key, const string &filter_op, const string &filter_value) + { + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME)); + + consumer->addToSync( + deque( + { + { + key, + SET_COMMAND, + { + {"key", filter_key}, + {"op", filter_op}, + {"value", filter_value} + } + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + } + + void RemoveFlowDumpFilter(const string &key) + { + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_DUMP_FILTER_TABLE_NAME)); + + consumer->addToSync( + deque( + { + { + key, + DEL_COMMAND, + { } + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + } + + void CreateFlowDumpSession(const string &key, const string &flow_state = "true", const string &max_flows = "1000", const string &timeout = "300", const vector &filters = {}) + { + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME)); + + vector fvs = { + {"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}, + {"flow_state", flow_state}, + {"max_flows", max_flows}, + {"timeout", timeout} + }; + + for (size_t i = 0; i < filters.size() && i < 5; i++) + { + string filter_attr = "filter_" + to_string(i + 1); + fvs.push_back({filter_attr, filters[i]}); + } + + consumer->addToSync( + deque( + { + { + key, + SET_COMMAND, + fvs + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + } + + void RemoveFlowDumpSession(const string &key) + { + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME)); + + consumer->addToSync( + deque( + { + { + key, + DEL_COMMAND, + { } + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + } + }; + + TEST_F(DashHaFlowOrchTest, CreateRemoveFlowSyncSession) + { + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // Verify STATE_DB update after creation + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + bool found_state = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "created") + { + found_state = true; + break; + } + } + ASSERT_TRUE(found_state) << "STATE_DB should have state=created after session creation"; + } + + TEST_F(DashHaFlowOrchTest, CreateFlowSyncSessionMissingFields) + { + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(0); + + // Missing target_server_ip + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME)); + + consumer->addToSync( + deque( + { + { + "SYNC_SESSION_1", + SET_COMMAND, + { + {"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}, + {"ha_set_id", ""}, + {"target_server_port", "8080"} + } + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + + // Verify STATE_DB update shows failed state + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + bool found_failed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "failed") + { + found_failed = true; + break; + } + } + ASSERT_TRUE(found_failed) << "STATE_DB should have state=failed when session creation fails"; + } + + TEST_F(DashHaFlowOrchTest, CreateFlowDumpSessionWithFilters) + { + sai_object_id_t filter_id_1 = 0x2000000000000001; + sai_object_id_t filter_id_2 = 0x2000000000000002; + sai_object_id_t session_id = 0x1000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(2) + .WillOnce(DoAll(SetArgPointee<0>(filter_id_1), Return(SAI_STATUS_SUCCESS))) + .WillOnce(DoAll(SetArgPointee<0>(filter_id_2), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + CreateFlowDumpFilter("FILTER_2", "ip_protocol", "equal_to", "6"); + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1", "FILTER_2"}); + } + + TEST_F(DashHaFlowOrchTest, CreateFlowDumpSessionWaitingForFilters) + { + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(0); + + // Create session before filters are available - should return task_need_retry + auto consumer = unique_ptr(new Consumer( + new swss::ConsumerStateTable(m_dpu_app_db.get(), APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME, 1, 1), + m_dashHaFlowOrch, APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME)); + + vector fvs = { + {"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}, + {"flow_state", "true"}, + {"max_flows", "1000"}, + {"timeout", "300"}, + {"filter_1", "FILTER_1"} + }; + + consumer->addToSync( + deque( + { + { + "DUMP_SESSION_1", + SET_COMMAND, + fvs + } + } + ) + ); + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + + // Verify session is in pending state + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector state_fvs; + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", state_fvs)); + bool found_pending = false; + for (const auto &fv : state_fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "pending") + { + found_pending = true; + break; + } + } + ASSERT_TRUE(found_pending) << "Session should be in pending state when filters are missing"; + + // Now add the filter + sai_object_id_t filter_id = 0x2000000000000001; + sai_object_id_t session_id = 0x1000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + + // Retry the session creation - now filters are available, session should be created + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + // Retry by calling doTask again on the same consumer (simulating orchagent retry mechanism) + static_cast(m_dashHaFlowOrch)->doTask(*consumer.get()); + + // Verify session was created successfully + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", state_fvs)); + bool found_created = false; + for (const auto &fv : state_fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "created") + { + found_created = true; + break; + } + } + ASSERT_TRUE(found_created) << "Session should be created after filters are available and retry"; + } + + TEST_F(DashHaFlowOrchTest, DuplicateSessionCreation) + { + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // Try to create another sync session - should fail because first session is still active + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(0); + + CreateFlowSyncSession("SYNC_SESSION_2", "", "192.168.1.2", "8081"); + + // Verify second session failed (should have failed state in STATE_DB) + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + if (state_table.get("SYNC_SESSION_2", fvs)) + { + bool found_failed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "failed") + { + found_failed = true; + break; + } + } + ASSERT_TRUE(found_failed) << "Second session should have failed state"; + } + } + + TEST_F(DashHaFlowOrchTest, CreateFlowSyncSessionSAIFailure) + { + // Test that when SAI create fails, state is updated to failed + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(Return(SAI_STATUS_FAILURE)); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // Verify STATE_DB update shows failed state + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + bool found_failed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "failed") + { + found_failed = true; + break; + } + } + ASSERT_TRUE(found_failed) << "STATE_DB should have state=failed when SAI create fails"; + } + + TEST_F(DashHaFlowOrchTest, CreateFlowDumpSessionSAIFailure) + { + sai_object_id_t filter_id = 0x2000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + + // SAI session create fails + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(Return(SAI_STATUS_FAILURE)); + + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1"}); + + // Verify STATE_DB update shows failed state + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)); + bool found_failed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "failed") + { + found_failed = true; + break; + } + } + ASSERT_TRUE(found_failed) << "STATE_DB should have state=failed when SAI create fails"; + } + + TEST_F(DashHaFlowOrchTest, CreateFlowDumpFilterInvalidKey) + { + // Test that invalid filter key results in failure + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(0); + + CreateFlowDumpFilter("FILTER_1", "invalid_key", "equal_to", "value"); + } + + TEST_F(DashHaFlowOrchTest, CreateFlowDumpFilterInvalidOp) + { + // Test that invalid filter op results in failure + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(0); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "invalid_op", "00:11:22:33:44:55"); + } + + TEST_F(DashHaFlowOrchTest, RemoveFlowDumpFilter) + { + sai_object_id_t filter_id = 0x2000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(Return(SAI_STATUS_SUCCESS)); + + RemoveFlowDumpFilter("FILTER_1"); + } + + TEST_F(DashHaFlowOrchTest, HandleFinishedBulkSyncSession) + { + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // Verify session was created + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + + // Verify SAI object deletion is called when handleFinished is called + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(Return(SAI_STATUS_SUCCESS)); + + // Call handleFinished via handleSessionFinished + m_dashHaFlowOrch->handleSessionFinished(session_id); + + // Verify state DB shows completed state + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + bool found_completed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "completed") + { + found_completed = true; + break; + } + } + ASSERT_TRUE(found_completed) << "STATE_DB should have state=completed after handleFinished"; + + // Verify that after handleFinished, a new session can be created + sai_object_id_t new_session_id = 0x1000000000000002; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(new_session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_2", "", "192.168.1.2", "8081"); + + // Verify new session was created successfully + ASSERT_TRUE(state_table.get("SYNC_SESSION_2", fvs)); + bool found_created = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "created") + { + found_created = true; + break; + } + } + ASSERT_TRUE(found_created) << "New session should be created after previous session finished"; + } + + TEST_F(DashHaFlowOrchTest, HandleTimeoutBulkSyncSession) + { + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // Verify session was created + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + + // Verify SAI object deletion is called when handleTimeout is called + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(Return(SAI_STATUS_SUCCESS)); + + // Call handleTimeout via handleTimerExpired + m_dashHaFlowOrch->handleTimerExpired(m_dashHaFlowOrch->getSyncTimer()); + + // Verify state DB shows failed state + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + bool found_failed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "failed") + { + found_failed = true; + break; + } + } + ASSERT_TRUE(found_failed) << "STATE_DB should have state=failed after handleTimeout"; + } + + TEST_F(DashHaFlowOrchTest, HandleFinishedFlowDumpSession) + { + sai_object_id_t filter_id = 0x2000000000000001; + sai_object_id_t session_id = 0x1000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1"}); + + // Verify session was created + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)); + + // Verify SAI object deletion is called when handleFinished is called + // Note: Filters are user-managed and not deleted when session finishes + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(Return(SAI_STATUS_SUCCESS)); + + // Call handleFinished via handleSessionFinished + m_dashHaFlowOrch->handleSessionFinished(session_id); + + // Verify state DB shows completed state with output_file + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)); + bool found_completed = false; + bool found_output_file = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "completed") + { + found_completed = true; + } + if (fvField(fv) == "output_file" && !fvValue(fv).empty()) + { + found_output_file = true; + } + } + ASSERT_TRUE(found_completed) << "STATE_DB should have state=completed after handleFinished"; + ASSERT_TRUE(found_output_file) << "STATE_DB should have output_file field after handleFinished"; + } + + TEST_F(DashHaFlowOrchTest, HandleTimeoutFlowDumpSession) + { + sai_object_id_t filter_id = 0x2000000000000001; + sai_object_id_t session_id = 0x1000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1"}); + + // Verify session was created + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)); + + // Verify SAI object deletion is called when handleTimeout is called + // Note: Filters are user-managed and not deleted when session times out + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(Return(SAI_STATUS_SUCCESS)); + + // Call handleTimeout via handleTimerExpired + m_dashHaFlowOrch->handleTimerExpired(m_dashHaFlowOrch->getDumpTimer()); + + // Verify state DB shows failed state + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)); + bool found_failed = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "failed") + { + found_failed = true; + break; + } + } + ASSERT_TRUE(found_failed) << "STATE_DB should have state=failed after handleTimeout"; + } + + TEST_F(DashHaFlowOrchTest, CreateBothFlowDumpAndBulkSyncSessions) + { + // Create flow dump session with filters + sai_object_id_t filter_id = 0x2000000000000001; + sai_object_id_t dump_session_id = 0x1000000000000001; + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(2) + .WillOnce(DoAll(SetArgPointee<0>(dump_session_id), Return(SAI_STATUS_SUCCESS))) + .WillOnce(DoAll(SetArgPointee<0>(0x1000000000000002), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1"}); + + // Create bulk sync session + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // Verify both sessions are created successfully in STATE_DB + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + + // Verify flow dump session + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)) << "Flow dump session should exist in STATE_DB"; + bool found_dump_created = false; + bool found_dump_type = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "created") + { + found_dump_created = true; + } + if (fvField(fv) == "type" && fvValue(fv) == DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP) + { + found_dump_type = true; + } + } + ASSERT_TRUE(found_dump_created) << "Flow dump session should have state=created"; + ASSERT_TRUE(found_dump_type) << "Flow dump session should have type=flow_dump"; + + // Verify bulk sync session + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)) << "Bulk sync session should exist in STATE_DB"; + bool found_sync_created = false; + bool found_sync_type = false; + for (const auto &fv : fvs) + { + if (fvField(fv) == "state" && fvValue(fv) == "created") + { + found_sync_created = true; + } + if (fvField(fv) == "type" && fvValue(fv) == DashHaFlowOrch::SESSION_TYPE_BULK_SYNC) + { + found_sync_type = true; + } + } + ASSERT_TRUE(found_sync_created) << "Bulk sync session should have state=created"; + ASSERT_TRUE(found_sync_type) << "Bulk sync session should have type=bulk_sync"; + } +} + diff --git a/tests/mock_tests/mock_orchagent_main.h b/tests/mock_tests/mock_orchagent_main.h index b087066fdd5..be7f10826f4 100644 --- a/tests/mock_tests/mock_orchagent_main.h +++ b/tests/mock_tests/mock_orchagent_main.h @@ -111,6 +111,7 @@ extern sai_dash_vip_api_t* sai_dash_vip_api; extern sai_dash_direction_lookup_api_t* sai_dash_direction_lookup_api; extern sai_dash_eni_api_t* sai_dash_eni_api; extern sai_dash_ha_api_t* sai_dash_ha_api; +extern sai_dash_flow_api_t* sai_dash_flow_api; extern sai_stp_api_t* sai_stp_api; extern sai_dash_outbound_ca_to_pa_api_t* sai_dash_outbound_ca_to_pa_api; extern sai_dash_pa_validation_api_t* sai_dash_pa_validation_api; diff --git a/tests/mock_tests/ut_saihelper.cpp b/tests/mock_tests/ut_saihelper.cpp index bb7d376b191..1ba0c48bf95 100644 --- a/tests/mock_tests/ut_saihelper.cpp +++ b/tests/mock_tests/ut_saihelper.cpp @@ -95,6 +95,7 @@ namespace ut_helper sai_api_query((sai_api_t)SAI_API_DASH_DIRECTION_LOOKUP, (void**)&sai_dash_direction_lookup_api); sai_api_query((sai_api_t)SAI_API_DASH_ENI, (void**)&sai_dash_eni_api); sai_api_query((sai_api_t)SAI_API_DASH_HA, (void**)&sai_dash_ha_api); + sai_api_query((sai_api_t)SAI_API_DASH_FLOW, (void**)&sai_dash_flow_api); sai_api_query((sai_api_t)SAI_API_DASH_OUTBOUND_CA_TO_PA, (void**)&sai_dash_outbound_ca_to_pa_api); sai_api_query((sai_api_t)SAI_API_DASH_PA_VALIDATION, (void**)&sai_dash_pa_validation_api); sai_api_query((sai_api_t)SAI_API_DASH_VNET, (void**)&sai_dash_vnet_api); @@ -140,6 +141,7 @@ namespace ut_helper sai_dash_direction_lookup_api = nullptr; sai_dash_eni_api = nullptr; sai_dash_ha_api = nullptr; + sai_dash_flow_api = nullptr; sai_stp_api = nullptr; sai_dash_meter_api = nullptr; From 583b45206ab0e4ba58efd328782a8c439f01bdda Mon Sep 17 00:00:00 2001 From: Vivek Reddy Date: Thu, 12 Feb 2026 10:08:49 +0200 Subject: [PATCH 2/3] Refactor and handle comments Signed-off-by: Vivek Reddy --- orchagent/dash/dashhafloworch.cpp | 54 ++++++++++++++----------------- orchagent/dash/dashhafloworch.h | 13 ++++---- 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/orchagent/dash/dashhafloworch.cpp b/orchagent/dash/dashhafloworch.cpp index 56696fc6236..8f97a0e93c9 100644 --- a/orchagent/dash/dashhafloworch.cpp +++ b/orchagent/dash/dashhafloworch.cpp @@ -234,14 +234,14 @@ bool FlowDumpFilterManager::deleteFilterSAI(sai_object_id_t filter_id) return true; } -FlowSyncHandler::FlowSyncHandler(DBConnector *dpu_state_db, SelectableTimer *timer) : +FlowApiHandler::FlowApiHandler(DBConnector *dpu_state_db, SelectableTimer *timer) : m_session_id(SAI_NULL_OBJECT_ID), m_timer(timer) { m_state_table = make_shared
(dpu_state_db, STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); } -void FlowSyncHandler::deleteSession() +void FlowApiHandler::deleteSession() { SWSS_LOG_ENTER(); @@ -259,7 +259,7 @@ void FlowSyncHandler::deleteSession() } } -bool FlowSyncHandler::deleteSessionSAI() +bool FlowApiHandler::deleteSessionSAI() { SWSS_LOG_ENTER(); @@ -274,7 +274,7 @@ bool FlowSyncHandler::deleteSessionSAI() return true; } -void FlowSyncHandler::updateState(const string &state, const string &key, vector fvs) +void FlowApiHandler::updateState(const string &state, const string &key, vector fvs) { chrono::steady_clock::time_point creation_time; chrono::steady_clock::time_point last_state_time = chrono::steady_clock::now(); @@ -302,7 +302,7 @@ void FlowSyncHandler::updateState(const string &state, const string &key, vector } BulkSyncHandler::BulkSyncHandler(DBConnector *dpu_state_db, SelectableTimer *timer) : - FlowSyncHandler(dpu_state_db, timer) + FlowApiHandler(dpu_state_db, timer) { } @@ -379,9 +379,7 @@ task_process_status BulkSyncHandler::handleSet(const string &table_name, const s if (isActive()) { SWSS_LOG_ERROR("Flow sync session already exists: %s. Cannot create new session: %s", m_key.c_str(), key.c_str()); - vector fvs; - fvs.push_back(FieldValueTuple("type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC)); - FlowSyncHandler::updateState("failed", key, fvs); + FlowApiHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); return task_failed; } @@ -390,17 +388,15 @@ task_process_status BulkSyncHandler::handleSet(const string &table_name, const s if (!initialize(key, attrs)) { SWSS_LOG_ERROR("Failed to initialize BulkSyncHandler for key %s", key.c_str()); - vector fvs; - fvs.push_back(FieldValueTuple("type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC)); - FlowSyncHandler::updateState("failed", key, fvs); - clearKey(); + FlowApiHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + reset(); return task_failed; } task_process_status status = createSession(); if (status != task_success && status != task_need_retry) { - clearKey(); + reset(); } return status; @@ -425,14 +421,14 @@ task_process_status BulkSyncHandler::createSession() if (m_target_server_ip.empty()) { SWSS_LOG_ERROR("Missing target_server_ip for flow sync session %s", m_key.c_str()); - FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + FlowApiHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); return task_failed; } if (m_target_server_port == 0) { SWSS_LOG_ERROR("Missing or invalid target_server_port for flow sync session %s", m_key.c_str()); - FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + FlowApiHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); return task_failed; } @@ -440,7 +436,7 @@ task_process_status BulkSyncHandler::createSession() if (session_id == SAI_NULL_OBJECT_ID) { SWSS_LOG_ERROR("Failed to create flow sync session %s", m_key.c_str()); - FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + FlowApiHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); return task_failed; } @@ -452,7 +448,7 @@ task_process_status BulkSyncHandler::createSession() m_timer->setInterval(interval); m_timer->start(); - FlowSyncHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + FlowApiHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); SWSS_LOG_NOTICE("Created flow sync session %s with session_id 0x%lx, timeout %u sec", m_key.c_str(), session_id, m_timeout_sec); return task_success; @@ -461,7 +457,7 @@ task_process_status BulkSyncHandler::createSession() void BulkSyncHandler::handleFinished() { SWSS_LOG_ENTER(); - FlowSyncHandler::updateState("completed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + FlowApiHandler::updateState("completed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); SWSS_LOG_NOTICE("Flow sync session %s completed successfully", m_key.c_str()); deleteSession(); reset(); @@ -470,7 +466,7 @@ void BulkSyncHandler::handleFinished() void BulkSyncHandler::handleTimeout() { SWSS_LOG_ENTER(); - FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); + FlowApiHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); SWSS_LOG_WARN("Flow sync session %s timed out", m_key.c_str()); deleteSession(); reset(); @@ -509,7 +505,7 @@ sai_object_id_t BulkSyncHandler::createSessionSAI() } FlowDumpHandler::FlowDumpHandler(DBConnector *dpu_state_db, SelectableTimer *timer, std::shared_ptr filter_manager) : - FlowSyncHandler(dpu_state_db, timer), + FlowApiHandler(dpu_state_db, timer), m_filter_manager(filter_manager) { } @@ -591,7 +587,7 @@ task_process_status FlowDumpHandler::createSession() if (filter_ids.size() != m_required_filter_keys.size()) { SWSS_LOG_INFO("Flow dump session %s waiting for filters to become available (%zu/%zu)", m_key.c_str(), filter_ids.size(), m_required_filter_keys.size()); - FlowSyncHandler::updateState("pending", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + FlowApiHandler::updateState("pending", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); return task_need_retry; } @@ -599,7 +595,7 @@ task_process_status FlowDumpHandler::createSession() if (session_id == SAI_NULL_OBJECT_ID) { SWSS_LOG_ERROR("Failed to create flow dump session %s", m_key.c_str()); - FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + FlowApiHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); return task_failed; } @@ -609,7 +605,7 @@ task_process_status FlowDumpHandler::createSession() m_timer->setInterval(interval); m_timer->start(); - FlowSyncHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + FlowApiHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); SWSS_LOG_NOTICE("Created flow dump session %s with session_id 0x%lx, timeout %u sec", m_key.c_str(), session_id, m_timeout_sec); @@ -626,7 +622,7 @@ task_process_status FlowDumpHandler::handleSet(const string &table_name, const s if (isActive()) { SWSS_LOG_ERROR("Flow dump session already exists: %s. Cannot create new session: %s", m_key.c_str(), key.c_str()); - FlowSyncHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + FlowApiHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); return task_failed; } @@ -635,15 +631,15 @@ task_process_status FlowDumpHandler::handleSet(const string &table_name, const s if (!initialize(key, attrs)) { SWSS_LOG_ERROR("Failed to initialize FlowDumpHandler for key %s", key.c_str()); - FlowSyncHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); - clearKey(); + FlowApiHandler::updateState("failed", key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + reset(); return task_failed; } task_process_status status = createSession(); if (status != task_success && status != task_need_retry) { - clearKey(); + reset(); } return status; } @@ -682,7 +678,7 @@ void FlowDumpHandler::handleFinished() vector fvs; fvs.push_back(FieldValueTuple("type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP)); fvs.push_back(FieldValueTuple("output_file", m_output_file)); - FlowSyncHandler::updateState("completed", m_key, fvs); + FlowApiHandler::updateState("completed", m_key, fvs); SWSS_LOG_NOTICE("Flow dump session %s completed successfully, output file: %s", m_key.c_str(), m_output_file.c_str()); deleteSession(); @@ -693,7 +689,7 @@ void FlowDumpHandler::handleTimeout() { SWSS_LOG_ENTER(); - FlowSyncHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); + FlowApiHandler::updateState("failed", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); deleteSession(); reset(); diff --git a/orchagent/dash/dashhafloworch.h b/orchagent/dash/dashhafloworch.h index 4d348aa4117..8bbd0d07ef7 100644 --- a/orchagent/dash/dashhafloworch.h +++ b/orchagent/dash/dashhafloworch.h @@ -51,11 +51,11 @@ class FlowDumpFilterManager std::map m_filter_cache; }; -class FlowSyncHandler +class FlowApiHandler { public: - FlowSyncHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer); - virtual ~FlowSyncHandler() = default; + FlowApiHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer); + virtual ~FlowApiHandler() = default; virtual bool initialize(const std::string &key, const std::vector &attrs) = 0; virtual task_process_status handleSet(const std::string &table_name, const std::string &key, const std::vector &attrs) = 0; @@ -67,7 +67,6 @@ class FlowSyncHandler virtual std::string getKey() const { return m_key; } virtual bool isActive() const { return m_session_id != SAI_NULL_OBJECT_ID; } virtual swss::SelectableTimer* getTimer() const { return m_timer; } - virtual void clearKey() { m_key = ""; } protected: virtual task_process_status createSession() = 0; @@ -86,7 +85,7 @@ class FlowSyncHandler std::shared_ptr m_state_table; }; -class BulkSyncHandler : public FlowSyncHandler +class BulkSyncHandler : public FlowApiHandler { public: BulkSyncHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer); @@ -112,7 +111,7 @@ class BulkSyncHandler : public FlowSyncHandler static constexpr uint32_t DEFAULT_TIMEOUT_SEC = 120; }; -class FlowDumpHandler : public FlowSyncHandler +class FlowDumpHandler : public FlowApiHandler { public: FlowDumpHandler(swss::DBConnector *dpu_state_db, swss::SelectableTimer *timer, std::shared_ptr filter_manager); @@ -151,7 +150,7 @@ class DashHaFlowOrch : public ZmqOrch DashHaFlowOrch(swss::DBConnector *db, const std::vector &tableNames, swss::DBConnector *app_state_db, swss::ZmqServer *zmqServer); protected: - std::map> m_handlers; + std::map> m_handlers; std::unique_ptr m_dpuStateDb; swss::SelectableTimer* m_sync_timer; From 777d521a7be2acda8574b44eac7c7ccce9881107 Mon Sep 17 00:00:00 2001 From: Vivek Reddy Date: Wed, 18 Mar 2026 03:09:23 +0200 Subject: [PATCH 3/3] Fix armhf build failure Signed-off-by: Vivek Reddy --- orchagent/dash/dashhafloworch.cpp | 15 ++-- tests/mock_tests/dashhafloworch_ut.cpp | 116 +++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 7 deletions(-) diff --git a/orchagent/dash/dashhafloworch.cpp b/orchagent/dash/dashhafloworch.cpp index 8f97a0e93c9..7f37d154b92 100644 --- a/orchagent/dash/dashhafloworch.cpp +++ b/orchagent/dash/dashhafloworch.cpp @@ -14,6 +14,7 @@ #include "schema.h" #include +#include #include #include #include @@ -90,7 +91,7 @@ task_process_status FlowDumpFilterManager::addFilter(const string &key, const ve entry.filter_id = filter_id; m_filter_cache[key] = entry; - SWSS_LOG_NOTICE("Created flow dump filter %s with filter_id 0x%lx", key.c_str(), filter_id); + SWSS_LOG_NOTICE("Created flow dump filter %s with filter_id 0x%" PRIx64, key.c_str(), static_cast(filter_id)); return task_success; } @@ -227,7 +228,7 @@ bool FlowDumpFilterManager::deleteFilterSAI(sai_object_id_t filter_id) if (status != SAI_STATUS_SUCCESS) { - SWSS_LOG_ERROR("Failed to delete flow bulk get session filter 0x%lx, status: %d", filter_id, status); + SWSS_LOG_ERROR("Failed to delete flow bulk get session filter 0x%" PRIx64 ", status: %d", static_cast(filter_id), status); return false; } @@ -267,7 +268,7 @@ bool FlowApiHandler::deleteSessionSAI() if (status != SAI_STATUS_SUCCESS) { - SWSS_LOG_ERROR("Failed to delete flow bulk get session 0x%lx, status: %d", m_session_id, status); + SWSS_LOG_ERROR("Failed to delete flow bulk get session 0x%" PRIx64 ", status: %d", static_cast(m_session_id), status); return false; } @@ -449,7 +450,7 @@ task_process_status BulkSyncHandler::createSession() m_timer->start(); FlowApiHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_BULK_SYNC}}); - SWSS_LOG_NOTICE("Created flow sync session %s with session_id 0x%lx, timeout %u sec", m_key.c_str(), session_id, m_timeout_sec); + SWSS_LOG_NOTICE("Created flow sync session %s with session_id 0x%" PRIx64 ", timeout %u sec", m_key.c_str(), static_cast(session_id), m_timeout_sec); return task_success; } @@ -607,7 +608,7 @@ task_process_status FlowDumpHandler::createSession() FlowApiHandler::updateState("created", m_key, {{"type", DashHaFlowOrch::SESSION_TYPE_FLOW_DUMP}}); - SWSS_LOG_NOTICE("Created flow dump session %s with session_id 0x%lx, timeout %u sec", m_key.c_str(), session_id, m_timeout_sec); + SWSS_LOG_NOTICE("Created flow dump session %s with session_id 0x%" PRIx64 ", timeout %u sec", m_key.c_str(), static_cast(session_id), m_timeout_sec); return task_success; } @@ -1008,7 +1009,7 @@ void DashHaFlowOrch::handleSessionFinished(sai_object_id_t session_id) if (h_pair.second->getSessionId() == session_id) { h_pair.second->handleFinished(); - SWSS_LOG_NOTICE("Session ID 0x%lx finished (type: %s)", session_id, h_pair.first.c_str()); + SWSS_LOG_NOTICE("Session ID 0x%" PRIx64 " finished (type: %s)", static_cast(session_id), h_pair.first.c_str()); found = true; break; } @@ -1016,7 +1017,7 @@ void DashHaFlowOrch::handleSessionFinished(sai_object_id_t session_id) if (!found) { - SWSS_LOG_WARN("Received FINISHED notification for unknown session 0x%lx", session_id); + SWSS_LOG_WARN("Received FINISHED notification for unknown session 0x%" PRIx64, static_cast(session_id)); } } diff --git a/tests/mock_tests/dashhafloworch_ut.cpp b/tests/mock_tests/dashhafloworch_ut.cpp index a5bc593a39e..2768b79dd75 100644 --- a/tests/mock_tests/dashhafloworch_ut.cpp +++ b/tests/mock_tests/dashhafloworch_ut.cpp @@ -30,6 +30,16 @@ namespace dashhafloworch_ut void handleTimerExpired(swss::SelectableTimer *timer) { DashHaFlowOrch::handleTimerExpired(timer); } swss::SelectableTimer* getSyncTimer() { return m_sync_timer; } swss::SelectableTimer* getDumpTimer() { return m_dump_timer; } + // For testing: invoke handler's handleDel with arbitrary table_name to cover branches + task_process_status invokeHandlerHandleDel(const std::string &key, const std::string &table_name) + { + for (auto &h_pair : m_handlers) + { + if (h_pair.second->getKey() == key) + return h_pair.second->handleDel(table_name, key); + } + return task_failed; + } }; class DashHaFlowOrchTest : public MockOrchTest @@ -482,6 +492,112 @@ namespace dashhafloworch_ut CreateFlowDumpFilter("FILTER_1", "eni_addr", "invalid_op", "00:11:22:33:44:55"); } + TEST_F(DashHaFlowOrchTest, CreateFlowDumpFilterSrcIpAddr) + { + // Cover IP address filter key branch (src_ip_addr) in createFilterSAI + sai_object_id_t filter_id = 0x2000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_SRC_IP", "src_ip_addr", "equal_to", "10.0.0.1"); + } + + TEST_F(DashHaFlowOrchTest, CreateFlowDumpFilterDstIpAddr) + { + // Cover IP address filter key branch (dst_ip_addr) in createFilterSAI + sai_object_id_t filter_id = 0x2000000000000002; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_DST_IP", "dst_ip_addr", "equal_to", "192.168.1.100"); + } + + TEST_F(DashHaFlowOrchTest, BulkSyncHandleDelReturnsFailed) + { + // Cover BulkSyncHandler::handleDel - delete not supported, returns task_failed + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + // DEL should not call SAI remove - handleDel returns task_failed and does not delete + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session).Times(0); + RemoveFlowSyncSession("SYNC_SESSION_1"); + + // Session should still exist in state (handler still holds it) + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("SYNC_SESSION_1", fvs)); + } + + TEST_F(DashHaFlowOrchTest, FlowDumpHandleDelKnownTableReturnsFailed) + { + // Cover FlowDumpHandler::handleDel when table_name == APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME + sai_object_id_t filter_id = 0x2000000000000001; + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1"}); + + // DEL should not call SAI remove - handleDel returns task_failed + EXPECT_CALL(*mock_sai_dash_flow_api, remove_flow_entry_bulk_get_session).Times(0); + RemoveFlowDumpSession("DUMP_SESSION_1"); + + swss::Table state_table(m_dpu_state_db.get(), STATE_DASH_FLOW_SYNC_SESSION_STATE_TABLE_NAME); + vector fvs; + ASSERT_TRUE(state_table.get("DUMP_SESSION_1", fvs)); + } + + TEST_F(DashHaFlowOrchTest, FlowDumpHandleDelUnknownTableReturnsFailed) + { + // Cover FlowDumpHandler::handleDel else branch (unknown table) + sai_object_id_t filter_id = 0x2000000000000001; + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session_filter) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(filter_id), Return(SAI_STATUS_SUCCESS))); + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowDumpFilter("FILTER_1", "eni_addr", "equal_to", "00:11:22:33:44:55"); + CreateFlowDumpSession("DUMP_SESSION_1", "true", "1000", "300", {"FILTER_1"}); + + task_process_status status = m_dashHaFlowOrch->invokeHandlerHandleDel("DUMP_SESSION_1", "UNKNOWN_TABLE"); + ASSERT_EQ(status, task_failed); + } + + TEST_F(DashHaFlowOrchTest, InvokeHandlerHandleDelKnownTable) + { + // invokeHandlerHandleDel with known table: delegates to handler, returns task_failed (delete not supported) + sai_object_id_t session_id = 0x1000000000000001; + EXPECT_CALL(*mock_sai_dash_flow_api, create_flow_entry_bulk_get_session) + .Times(1) + .WillOnce(DoAll(SetArgPointee<0>(session_id), Return(SAI_STATUS_SUCCESS))); + + CreateFlowSyncSession("SYNC_SESSION_1", "", "192.168.1.1", "8080"); + + task_process_status status = m_dashHaFlowOrch->invokeHandlerHandleDel("SYNC_SESSION_1", APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME); + ASSERT_EQ(status, task_failed); + } + + TEST_F(DashHaFlowOrchTest, InvokeHandlerHandleDelKeyNotFound) + { + // invokeHandlerHandleDel with key not in any handler: returns task_failed + task_process_status status = m_dashHaFlowOrch->invokeHandlerHandleDel("NONEXISTENT_SESSION", APP_DASH_FLOW_SYNC_SESSION_TABLE_NAME); + ASSERT_EQ(status, task_failed); + } + TEST_F(DashHaFlowOrchTest, RemoveFlowDumpFilter) { sai_object_id_t filter_id = 0x2000000000000001;