From 32c3c480c8f30cb459b173cd14be40cdae291b29 Mon Sep 17 00:00:00 2001 From: Prabhu Sreenivasan Date: Tue, 15 Dec 2020 21:09:25 -0800 Subject: [PATCH] Threshold feature Signed-off-by: Prabhu Sreenivasan --- Makefile.am | 1 + autogen.sh | 6 + configure.ac | 88 ++ debian/changelog | 6 + debian/compat | 1 + debian/control | 18 + debian/rules | 40 + proto/Makefile.am | 11 + proto/sai_tam_buffer_stats.proto | 77 ++ proto/sai_tam_event.proto | 50 ++ proto/sai_tam_event_flow_learning.proto | 48 ++ proto/sai_tam_event_threshold_breach.proto | 45 + proto/sai_tam_main.proto | 63 ++ tam-buffers/Makefile.am | 12 + .../tam_proto_buffer_stats_decoder.cpp | 146 ++++ tam-buffers/tam_proto_buffer_stats_decoder.h | 104 +++ tam-buffers/tam_redis_adapter.cpp | 683 +++++++++++++++ tam-buffers/tam_redis_adapter.h | 145 ++++ tam-buffers/tam_socket_interface.cpp | 133 +++ tam-buffers/tam_socket_interface.h | 95 +++ thresholdmgr/Makefile.am | 20 + thresholdmgr/test_proto.cpp | 248 ++++++ thresholdmgr/thresholdmgr.cpp | 806 ++++++++++++++++++ thresholdmgr/thresholdmgr.h | 109 +++ thresholdmgr/thresholdmgr_main.cpp | 90 ++ 25 files changed, 3045 insertions(+) create mode 100644 Makefile.am create mode 100755 autogen.sh create mode 100644 configure.ac create mode 100644 debian/changelog create mode 100644 debian/compat create mode 100644 debian/control create mode 100755 debian/rules create mode 100644 proto/Makefile.am create mode 100644 proto/sai_tam_buffer_stats.proto create mode 100644 proto/sai_tam_event.proto create mode 100644 proto/sai_tam_event_flow_learning.proto create mode 100644 proto/sai_tam_event_threshold_breach.proto create mode 100644 proto/sai_tam_main.proto create mode 100644 tam-buffers/Makefile.am create mode 100644 tam-buffers/tam_proto_buffer_stats_decoder.cpp create mode 100644 tam-buffers/tam_proto_buffer_stats_decoder.h create mode 100644 tam-buffers/tam_redis_adapter.cpp create mode 100644 tam-buffers/tam_redis_adapter.h create mode 100644 tam-buffers/tam_socket_interface.cpp create mode 100644 tam-buffers/tam_socket_interface.h create mode 100644 thresholdmgr/Makefile.am create mode 100644 thresholdmgr/test_proto.cpp create mode 100644 thresholdmgr/thresholdmgr.cpp create mode 100644 thresholdmgr/thresholdmgr.h create mode 100644 thresholdmgr/thresholdmgr_main.cpp diff --git a/Makefile.am b/Makefile.am new file mode 100644 index 0000000..2d8f405 --- /dev/null +++ b/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = proto tam-buffers thresholdmgr diff --git a/autogen.sh b/autogen.sh new file mode 100755 index 0000000..c8d0bbe --- /dev/null +++ b/autogen.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +libtoolize --force --copy && +autoreconf --force --install -I m4 +rm -Rf autom4te.cache + diff --git a/configure.ac b/configure.ac new file mode 100644 index 0000000..e2b7c07 --- /dev/null +++ b/configure.ac @@ -0,0 +1,88 @@ +AC_INIT([sonic-tam],[1.0.0]) +AC_CONFIG_SRCDIR([]) +AC_CONFIG_AUX_DIR(config) +AM_CONFIG_HEADER(config.h) +AM_INIT_AUTOMAKE([foreign]) +AC_LANG_C +AC_LANG([C++]) +AC_PROG_CC +AC_PROG_CXX +AC_PROG_LIBTOOL +AC_HEADER_STDC +PKG_CHECK_MODULES(PROTOBUF, protobuf >= 2.4.0) +AC_SUBST(PROTOBUF_LIBS) +AC_SUBST(PROTOBUF_CFLAGS) +AC_SUBST(PROTOBUF_VERSION) + +AC_CHECK_LIB([hiredis], [redisConnect],, + AC_MSG_ERROR([libhiredis is not installed.])) + +#AC_CHECK_LIB([nl-genl-3], [genl_connect]) + +AC_CHECK_PROG([PROTOC], [protoc], [protoc]) +AS_IF([test "x${PROTOC}" == "x"], + [AC_MSG_ERROR(["protoc" not found.])]) + +AC_ARG_ENABLE(debug, +[ --enable-debug Compile with debugging flags], +[case "${enableval}" in + yes) debug=true ;; + no) debug=false ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-debug) ;; +esac],[debug=false]) +AM_CONDITIONAL(DEBUG, test x$debug = xtrue) + +AC_ARG_ENABLE(gtest, +[ --enable-gtest Compile with googletest flags], +[case "${enableval}" in + yes) gtest=true ;; + no) gtest=false ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-gtest) ;; +esac],[gtest=false]) +AM_CONDITIONAL(GTEST, test x$gtest = xtrue) + +CFLAGS_COMMON="-std=c++11 -Wall -fPIC -Wno-write-strings -I/usr/include/libnl3 -I/usr/include/swss" + +CFLAGS_COMMON+=" -Werror" +CFLAGS_COMMON+=" -Wno-reorder" +CFLAGS_COMMON+=" -Wcast-align" +CFLAGS_COMMON+=" -Wcast-qual" +CFLAGS_COMMON+=" -Wdisabled-optimization" +CFLAGS_COMMON+=" -Wextra" +CFLAGS_COMMON+=" -Wfloat-equal" +CFLAGS_COMMON+=" -Wimport" +CFLAGS_COMMON+=" -Winit-self" +CFLAGS_COMMON+=" -Winvalid-pch" +CFLAGS_COMMON+=" -Wlong-long" +CFLAGS_COMMON+=" -Wmissing-field-initializers" +CFLAGS_COMMON+=" -Wno-aggregate-return" +CFLAGS_COMMON+=" -Wno-padded" +CFLAGS_COMMON+=" -Wno-switch-enum" +CFLAGS_COMMON+=" -Wno-unused-parameter" +CFLAGS_COMMON+=" -Wpacked" +CFLAGS_COMMON+=" -Wpointer-arith" +CFLAGS_COMMON+=" -Wredundant-decls" +CFLAGS_COMMON+=" -Wstack-protector" +CFLAGS_COMMON+=" -Wstrict-aliasing=3" +CFLAGS_COMMON+=" -Wswitch" +CFLAGS_COMMON+=" -Wswitch-default" +CFLAGS_COMMON+=" -Wunreachable-code" +CFLAGS_COMMON+=" -Wunused" +CFLAGS_COMMON+=" -Wvariadic-macros" +CFLAGS_COMMON+=" -Wno-switch-default" +CFLAGS_COMMON+=" -Wno-long-long" +CFLAGS_COMMON+=" -Wno-redundant-decls" +CFLAGS_COMMON+=" -Wno-misleading-indentation" + +PROTOBUF_CFLAGS = `pkg-config --cflags --libs protobuf` + +AC_SUBST(CFLAGS_COMMON) + +AC_CONFIG_FILES([ + proto/Makefile + tam-buffers/Makefile + thresholdmgr/Makefile + Makefile +]) + +AC_OUTPUT diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000..8f09fc3 --- /dev/null +++ b/debian/changelog @@ -0,0 +1,6 @@ +sonic (1.0.0) stable; urgency=medium + + * Initial release. + + -- TAM <@broadcom.com> Mon, 10 June 2019 12:00:00 -0800 + diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..ec63514 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +9 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..93598ff --- /dev/null +++ b/debian/control @@ -0,0 +1,18 @@ +Source: sonic +Maintainer: Broadcom +Section: net +Priority: optional +Build-Depends: dh-exec (>=0.3), debhelper (>= 9), autotools-dev +Standards-Version: 1.0.0 + +Package: tam +Architecture: any +Depends: ${shlibs:Depends} +Description: This package contains TAM for SONiC project. + +Package: tam-dbg +Architecture: any +Section: debug +Priority: extra +Depends: tam (=${binary:Version}) +Description: debugging symbols for tam diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000..25857ed --- /dev/null +++ b/debian/rules @@ -0,0 +1,40 @@ +#!/usr/bin/make -f +# See debhelper(7) (uncomment to enable) +# output every command that modifies files on the build system. +#export DH_VERBOSE = 1 + +# see EXAMPLES in dpkg-buildflags(1) and read /usr/share/dpkg/* +DPKG_EXPORT_BUILDFLAGS = 1 +include /usr/share/dpkg/default.mk + +# see FEATURE AREAS in dpkg-buildflags(1) +#export DEB_BUILD_MAINT_OPTIONS = hardening=+all + +# see ENVIRONMENT in dpkg-buildflags(1) +# package maintainers to append CFLAGS +#export DEB_CFLAGS_MAINT_APPEND = -Wall -pedantic +# package maintainers to append LDFLAGS +#export DEB_LDFLAGS_MAINT_APPEND = -Wl,--as-needed + + +# main packaging script based on dh7 syntax +%: + dh $@ --with autotools-dev + +# dh_make generated override targets +# This is example for Cmake (See https://bugs.debian.org/641051 ) +#override_dh_auto_configure: +# dh_auto_configure -- \ +# -DCMAKE_LIBRARY_PATH=$(DEB_HOST_MULTIARCH) + +override_dh_auto_configure: + dh_auto_configure -- + +override_dh_auto_install: + dh_auto_install --destdir=debian/tam +ifeq ($(SONIC_COVERAGE_ON),y) + find . -name "*.gcno" | sed 's|.*|tar -uvf debian/gcov_tam_$(shell date '+%y%m%d_%H%M').tar &|g'|sh +endif + +override_dh_strip: + dh_strip --dbg-package=tam-dbg diff --git a/proto/Makefile.am b/proto/Makefile.am new file mode 100644 index 0000000..b9c6985 --- /dev/null +++ b/proto/Makefile.am @@ -0,0 +1,11 @@ + +%.pb.cc %.pb.h: %.proto + $(PROTOC) --proto_path=$(srcdir) -I /usr/include --cpp_out=$(srcdir) $^ + +dist_noinst_DATA = sai_tam_main.proto sai_tam_buffer_stats.proto sai_tam_event_threshold_breach.proto sai_tam_event.proto sai_tam_event_flow_learning.proto + +nodist_proto_SOURCES = $(srcdir)/proto/sai_tam_main.pb.cc $(srcdir)/proto/sai_tam_buffer_stats.pb.cc $(srcdir)/proto/sai_tam_event_threshold_breach.pb.cc $(srcdir)/proto/sai_tam_event.pb.cc $(srcdir)/proto/sai_tam_event_flow_learning.pb.cc + +BUILT_SOURCES = sai_tam_main.pb.h sai_tam_buffer_stats.pb.h sai_tam_event_threshold_breach.pb.h sai_tam_event.pb.h sai_tam_event_flow_learning.pb.h + +MOSTLYCLEANFILES = sai_tam_main.pb.h sai_tam_main.pb.cc sai_tam_buffer_stats.pb.h sai_tam_buffer_stats.pb.cc sai_tam_event_threshold_breach.pb.h sai_tam_event_threshold_breach.pb.cc sai_tam_event.pb.h sai_tam_event.pb.cc sai_tam_event_flow_learning.pb.h sai_tam_event_flow_learning.pb.cc diff --git a/proto/sai_tam_buffer_stats.proto b/proto/sai_tam_buffer_stats.proto new file mode 100644 index 0000000..69d5634 --- /dev/null +++ b/proto/sai_tam_buffer_stats.proto @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2017 Broadcom. The term "Broadcom" refers + * to Broadcom Limited and/or its subsidiaries. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +import "sai_tam_main.proto"; + +message BufferStatistics { + // Counter: the total number of dropped bytes + optional uint64 drop_bytes = 1 [(telemetry_options).is_counter = true]; + + // Peak: the max measured buffer depth, in bytes, across all measurements since boot. + optional uint64 peak_buffer_occupancy_bytes = 2 [(telemetry_options).is_gauge = true]; + + // Peak: the max measured buffer depth, in percent, across all measurements since boot. + optional uint32 peak_buffer_occupancy_percent = 3 [(telemetry_options).is_gauge = true]; +} + +enum QueueType { + QUEUE_UNICAST = 1; //Unicast Queue + QUEUE_MULTICAST = 2; // Multicast Queue +} + +message QueueStatistics { + required uint64 queue_oid = 1; //Queue + required QueueType queue_type = 2; // Unicast or Multicast ? + required BufferStatistics stats = 3; // Buffer Statistics +} + +enum IPGType { + IPG_SHARED = 1; //IPG Shared + IPG_XOFF = 2; // IPG Headroom +} + +message IPGStatistics { + required uint64 ipg_oid = 1; // IPG Index + required IPGType ipg_type = 2; // Shared or Headroom + required BufferStatistics stats = 3; // Buffer Statistics +} + +enum BufferPoolType { + BUFFERPOOL_INGRESS = 1; // Ingress pool + BUFFERPOOL_EGRESS = 2; // Egress pool +} + +message BufferPoolStatistics { + required uint64 pool_oid = 1; // Pool index + required BufferPoolType pool_type = 2; // Ingress or Egress ? + required BufferStatistics stats = 3; // Buffer statistics +} + +message InterfaceBufferStatistics { + required uint64 port_oid = 1; // Interface Name as known in the system + repeated QueueStatistics queue_stats = 2; // Queue statistics for this port + repeated IPGStatistics ipg_stats = 3; // IPG statistics for this port + repeated BufferPoolStatistics pool_stats = 4; // Pool statistics for this port +} + +message SwitchBufferStats { + repeated BufferPoolStatistics pool_stats = 1; // Global Pool Statistics + repeated InterfaceBufferStatistics intf_buffer_stats = 2; // Per-port statistics +} + diff --git a/proto/sai_tam_event.proto b/proto/sai_tam_event.proto new file mode 100644 index 0000000..372aad2 --- /dev/null +++ b/proto/sai_tam_event.proto @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2017 Broadcom. The term "Broadcom" refers + * to Broadcom Limited and/or its subsidiaries. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +import "sai_tam_event_flow_learning.proto"; +import "sai_tam_event_threshold_breach.proto"; +import "sai_tam_buffer_stats.proto"; + +message Event { + required uint64 timestamp = 1; + + oneof EventType { + Drop drop_event = 2; + FlowLearning flow_event = 3; + ThresholdBreach threshold_event = 4; + SwitchBufferStats switch_buffer_event = 5; + } +} + +message Drop { + optional uint32 dummy = 1; +} + +message GenEvent { + required string system_id = 1; + optional uint32 component_id = 2; + optional uint32 sub_component_id= 3; + repeated EventPair eventpair = 4; + optional string hostname = 5; +} + +message EventPair { + repeated Event event = 1; + repeated Flow flow = 2; +} diff --git a/proto/sai_tam_event_flow_learning.proto b/proto/sai_tam_event_flow_learning.proto new file mode 100644 index 0000000..c7a36d2 --- /dev/null +++ b/proto/sai_tam_event_flow_learning.proto @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2017 Broadcom. The term "Broadcom" refers + * to Broadcom Limited and/or its subsidiaries. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +message FlowLearning { + enum FL_STATE { + FL_INVALID = 0; + FL_LEARN = 1; + FL_AGING = 2; + FL_EXPORT = 3; + FL_TABLE_FULL = 4; /* Atomic event, no other information need to be send */ + } + optional FL_STATE fl_state = 1; +} + +message Flow { + optional uint32 proto = 1; + optional uint32 sip = 2; + optional uint32 dip = 3; + + optional uint32 l4_sport = 4; + optional uint32 l4_dport = 5; + optional uint32 vnid = 6; + optional uint32 inner_proto = 7; + optional uint32 inner_sip = 8; + optional uint32 inner_dip = 9; + + optional uint32 inner_l4_sport = 10; + optional uint32 inner_l4_dport = 11; + optional bytes custom_key = 12; + optional uint32 group_id = 13; + optional bytes packet = 14; +} diff --git a/proto/sai_tam_event_threshold_breach.proto b/proto/sai_tam_event_threshold_breach.proto new file mode 100644 index 0000000..a011100 --- /dev/null +++ b/proto/sai_tam_event_threshold_breach.proto @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2017 Broadcom. The term "Broadcom" refers + * to Broadcom Limited and/or its subsidiaries. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +import "sai_tam_main.proto"; +import "sai_tam_buffer_stats.proto"; + +enum ThresholdBreachSourceType { + THRESHOLD_BREACH_AT_QUEUE = 1; + THRESHOLD_BREACH_AT_GLOBAL_POOLS = 2; + THRESHOLD_BREACH_AT_IPG = 3; + THRESHOLD_BREACH_AT_PORT_POOL = 4; +} + +message ThresholdSource { + required ThresholdBreachSourceType type = 1; + optional uint64 port_oid = 2; + optional QueueType queue_type = 3; + optional uint64 queue_oid = 4; + optional BufferPoolType pool_type = 5; + optional uint64 pool_oid = 6; + optional IPGType ipg_type = 7; + optional uint64 ipg_oid = 8; +} + +message ThresholdBreach { + required ThresholdSource breach_source = 1; + optional SwitchBufferStats buffer_stats = 2; +} + diff --git a/proto/sai_tam_main.proto b/proto/sai_tam_main.proto new file mode 100644 index 0000000..bce0874 --- /dev/null +++ b/proto/sai_tam_main.proto @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2017 Broadcom. The term "Broadcom" refers + * to Broadcom Limited and/or its subsidiaries. + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; +import "google/protobuf/descriptor.proto"; + +extend google.protobuf.FieldOptions { + optional TelemetryFieldOptions telemetry_options = 1024; +} + +message TelemetryFieldOptions { + optional bool is_key = 1; + optional bool is_timestamp = 2; + optional bool is_counter = 3; + optional bool is_gauge = 4; +} + + +message TelemetryStream { + required string system_id = 1 [(telemetry_options).is_key = true]; + optional uint32 component_id = 2 [(telemetry_options).is_key = true]; + optional uint32 sub_component_id = 3 [(telemetry_options).is_key = true]; + optional string sensor_name = 4 [(telemetry_options).is_key = true]; + optional uint32 sequence_number = 5; + // timestamp (milliseconds since 00:00:00 UTC 1/1/1970) + optional uint64 timestamp = 6 [(telemetry_options).is_timestamp = true]; + optional uint32 version_major = 7; + optional uint32 version_minor = 8; + + optional IETFSensors ietf = 100; + optional EnterpriseSensors enterprise = 101; +} + +message IETFSensors { + extensions 1 to max; +} + +message EnterpriseSensors { + extensions 1 to max; +} + +extend EnterpriseSensors { + // re-use IANA assigned numbers + optional SAISwitchSensors sai = 2636; +} + +message SAISwitchSensors { + extensions 1 to max; +} diff --git a/tam-buffers/Makefile.am b/tam-buffers/Makefile.am new file mode 100644 index 0000000..30e16b6 --- /dev/null +++ b/tam-buffers/Makefile.am @@ -0,0 +1,12 @@ +INCLUDES = -I $(top_srcdir)/tam -I $(top_srcdir)/proto + +noinst_LIBRARIES = libtambuffers.a + +if DEBUG +DBGFLAGS = -ggdb -DDEBUG +else +DBGFLAGS = -g -DNDEBUG +endif + +libtambuffers_a_SOURCES = tam_socket_interface.cpp tam_redis_adapter.cpp tam_proto_buffer_stats_decoder.cpp +libtambuffers_a_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(COV_CFLAGS) $(ASAN_CFLAGS) diff --git a/tam-buffers/tam_proto_buffer_stats_decoder.cpp b/tam-buffers/tam_proto_buffer_stats_decoder.cpp new file mode 100644 index 0000000..8e422a1 --- /dev/null +++ b/tam-buffers/tam_proto_buffer_stats_decoder.cpp @@ -0,0 +1,146 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "tam_proto_buffer_stats_decoder.h" + +TamProtoBufferStatsDecoder::TamProtoBufferStatsDecoder(SwitchBufferStats *bufferStats) +{ + m_protoDecoded = false; + m_bufferStatsTimestamp = 0; + + /* bufferStats in the incoming proto to be decoded. */ + if (bufferStats == NULL) + { + SWSS_LOG_ERROR("Invalid buffer stats proto provided to TamProtoBufferStatsDecoder."); + return; + } + + /* Decode and buffer stats proto and populate the decoded data. */ + m_protoDecoded = decodeTamBufferStatsProto(bufferStats); + if (m_protoDecoded != true) + { + SWSS_LOG_ERROR("Unable to decode Tam buffer stats protobuf."); + } +} + +bool TamProtoBufferStatsDecoder::decodeTamBufferStatsProto(SwitchBufferStats *bufferStats) +{ + int i = 0, j = 0; + BufferPoolStatistics poolStats; + BufferStatistics stats; + InterfaceBufferStatistics intfStats; + IPGStatistics ipgStats; + QueueStatistics queueStats; + + if (bufferStats == NULL) + { + SWSS_LOG_ERROR("Invalid buffer stats proto provided to decodeTamBufferStatsProto."); + return false; + } + + /* Decode global buffer pool stats */ + for (i = 0; i < bufferStats->pool_stats_size(); i++) + { + poolStats = bufferStats->pool_stats(i); + stats = poolStats.stats(); + poolType key = std::make_pair(poolStats.pool_oid(), poolStats.pool_type()); + statData data = std::make_pair(stats.peak_buffer_occupancy_bytes(), stats.peak_buffer_occupancy_percent()); + + /* Insert the global pool stats into the map. */ + m_globalPoolBufStats[key] = data; + } + + /* Decode per interface stats. */ + for (i = 0; i < bufferStats->intf_buffer_stats_size(); i++) + { + intfStats = bufferStats->intf_buffer_stats(i); + + /* Retrieve PG stats */ + for (j = 0; j < intfStats.ipg_stats_size(); j++) + { + ipgStats = intfStats.ipg_stats(j); + stats = ipgStats.stats(); + pgType key = std::make_pair(ipgStats.ipg_oid(), ipgStats.ipg_type()); + statData data = std::make_pair(stats.peak_buffer_occupancy_bytes(), stats.peak_buffer_occupancy_percent()); + + /* Insert entry to pg stats. */ + m_pgBufStats[key] = data; + } + + /* Retrieve queue stats */ + for (j = 0; j < intfStats.queue_stats_size(); j++) + { + queueStats = intfStats.queue_stats(j); + stats = queueStats.stats(); + queueType key = std::make_pair(queueStats.queue_oid(), queueStats.queue_type()); + statData data = std::make_pair(stats.peak_buffer_occupancy_bytes(), stats.peak_buffer_occupancy_percent()); + + /* Insert entry to queue stats. */ + m_queueBufStats[key] = data; + } + + /* Retrieve interface buffer pool stats */ + for (j = 0; j < intfStats.pool_stats_size(); j++) + { + poolStats = intfStats.pool_stats(j); + stats = poolStats.stats(); + interfacePoolType key = std::make_pair(poolStats.pool_oid(), poolStats.pool_type()); + statData data = std::make_pair(stats.peak_buffer_occupancy_bytes(), stats.peak_buffer_occupancy_percent()); + + /* Insert entry to interface pool stats. */ + m_interfacePoolBufStats[key] = data; + } + } + + /* Retrieve buffer stats proto timestamp */ + m_bufferStatsTimestamp = 0; + + return true; +} + +bufferStatsPg TamProtoBufferStatsDecoder::getPriorityGroupBufferStats() +{ + return m_pgBufStats; +} + +bufferStatsQueue TamProtoBufferStatsDecoder::getQueueBufferStats() +{ + return m_queueBufStats; +} + +bufferStatsGlobalPool TamProtoBufferStatsDecoder::getBufferPoolBufferStats() +{ + return m_globalPoolBufStats; +} + +bufferStatsInterfacePool TamProtoBufferStatsDecoder::getInterfaceBufferPoolBufferStats() +{ + return m_interfacePoolBufStats; +} + +uint64_t TamProtoBufferStatsDecoder::getBufferStatsTimestamp() +{ + return m_bufferStatsTimestamp; +} + +bool TamProtoBufferStatsDecoder::getProtoDecoded() +{ + return m_protoDecoded; +} diff --git a/tam-buffers/tam_proto_buffer_stats_decoder.h b/tam-buffers/tam_proto_buffer_stats_decoder.h new file mode 100644 index 0000000..05aefe3 --- /dev/null +++ b/tam-buffers/tam_proto_buffer_stats_decoder.h @@ -0,0 +1,104 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TAM_PROTO_BUFFER_STATS_DECODER_H_ +#define _TAM_PROTO_BUFFER_STATS_DECODER_H_ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "sai_tam_buffer_stats.pb.h" + +using namespace std; + +typedef enum _bufferStatType { + SNAPSHOTMGR_BUFFER_TYPE_PG, + SNAPSHOTMGR_BUFFER_TYPE_QUEUE, + SNAPSHOTMGR_BUFFER_TYPE_POOL +} bufferStatType; + +typedef std::pair pgType; + +typedef std::pair queueType; + +typedef std::pair poolType; + +typedef std::pair interfacePoolType; + +typedef std::pair statData; + +/* PG stats map: Key - (PG RID, PG type), Data - (64-bit stat in bytes, 64-bit stat in percent) */ +typedef std::map bufferStatsPg; + +/* Queue stats map: Key - (Queue RID, Queue type), Data - (64-bit stat in bytes, 64-bit stat in percent) */ +typedef std::map bufferStatsQueue; + +/* Global Buffer pool stats map: Key - (buffer pool RID, Pool Type), Data - (64-bit stat in bytes, 64-bit stat in percent) */ +typedef std::map bufferStatsGlobalPool; + +/* Per interface buffer pool stats map: Key - (buffer pool RID, Interface Pool Type), Data - (64-bit stat in bytes, 64-bit stat in percent) */ +typedef std::map bufferStatsInterfacePool; + +class TamProtoBufferStatsDecoder +{ +public: + TamProtoBufferStatsDecoder(SwitchBufferStats *bufferStats); + + ~TamProtoBufferStatsDecoder() + { + } + + /* -------------- Routines to decode and retrieve class data with snapshot information -------------- */ + + /* Routine to get PG buffer stats. */ + bufferStatsPg getPriorityGroupBufferStats(); + + /* Routine to get queue buffer stats. */ + bufferStatsQueue getQueueBufferStats(); + + /* Routine to get global buffer pool buffer stats. */ + bufferStatsGlobalPool getBufferPoolBufferStats(); + + /* Routine to get interface buffer pool buffer stats. */ + bufferStatsInterfacePool getInterfaceBufferPoolBufferStats(); + + /* Routine to get buffer stats timestamp. */ + uint64_t getBufferStatsTimestamp(); + + /* check if proto was decoded successfully. */ + bool getProtoDecoded(); + +private: + bool m_protoDecoded; /* True if proto decoded successfully. */ + bufferStatsPg m_pgBufStats; /* PG buffer stats */ + bufferStatsQueue m_queueBufStats; /* Queue buffer stats */ + bufferStatsGlobalPool m_globalPoolBufStats; /* Global buffer pool buffer stats */ + bufferStatsInterfacePool m_interfacePoolBufStats; /* Interface buffer pool buffer stats */ + uint64_t m_bufferStatsTimestamp; /* Buffer stats proto timestamp. */ + + /* Routine to decode the snapshot protobuf data and populate the class data. */ + bool decodeTamBufferStatsProto(SwitchBufferStats *bufferStats); +}; + +#endif /* _TAM_PROTO_BUFFER_STATS_DECODER_H_ */ diff --git a/tam-buffers/tam_redis_adapter.cpp b/tam-buffers/tam_redis_adapter.cpp new file mode 100644 index 0000000..8c0b69c --- /dev/null +++ b/tam-buffers/tam_redis_adapter.cpp @@ -0,0 +1,683 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "tam_redis_adapter.h" + +using namespace std; +using namespace swss; + +/* Constructor function for Redis interface. + * The redis interface is a placeholder for general routines + * used to convert data in REDIS DBs. + */ +TamRedisAdapter::TamRedisAdapter(DBConnector *appDb, DBConnector *stateDb, + DBConnector *counterDb, DBConnector *asicDb) : + m_queue_name_table(counterDb, COUNTERS_QUEUE_NAME_MAP), + m_queue_type_table(counterDb, COUNTERS_QUEUE_TYPE_MAP), + m_queue_port_table(counterDb, COUNTERS_QUEUE_PORT_MAP), + m_queue_map_table(counterDb, COUNTERS_QUEUE_MAP), + m_port_name_table(counterDb, COUNTERS_PORT_NAME_MAP), + m_pool_name_table(counterDb, COUNTERS_BUFFER_POOL_NAME_MAP), + m_pg_name_table(counterDb, COUNTERS_PG_NAME_MAP), + m_vid_rid_table(asicDb, "VIDTORID") +{ +// /* Initialize all system related maps */ +// init_rid_portname_map(); +// init_rid_queuenum_map(); +// init_rid_pgnum_map(); + + m_asicDb = asicDb; + + m_mapsInitialized = false; +} + +/* Validate if RID exists on the + * system. + */ +bool TamRedisAdapter::validateRID(string rid) +{ + Table tbl = Table(m_asicDb, "RIDTOVID"); + string val; + + tbl.hget("", rid, val); + if (val != "") + { + return true; + } + + return false; +} + + +/* Generate the RID to string maps for ports, PGs, queues. + * + */ +bool TamRedisAdapter::generateRedisOidMaps() +{ + bool rc = false; + + /* Initialize all system related maps */ + rc = init_rid_portname_map(); + if (rc != true) + { + SWSS_LOG_ERROR("Unable to generate RID-portname map."); + return false; + } + + rc = init_rid_queuenum_map(); + if (rc != true) + { + SWSS_LOG_ERROR("Unable to generate RID-queuenum map."); + return false; + } + + rc = init_rid_pgnum_map(); + if (rc != true) + { + SWSS_LOG_ERROR("Unable to generate RID-pgnum map."); + return false; + } + + rc = init_rid_vid_map(); + if (rc != true) + { + SWSS_LOG_ERROR("Unable to generate RID-VID map."); + return false; + } + + /*initialize buffer pool from VID to buffer pool name*/ + if(init_buffer_pool_map() != true) + { + /* buffer pools will be created only when user explicitly configures it. ignore this error */ + SWSS_LOG_INFO("Unable to create pool map,Buffer pools are not created"); + } + m_mapsInitialized = true; + SWSS_LOG_DEBUG("Initialized redis RID maps for proto processing."); + + return rc; +} + +/* Initialize RID (SAI object id) to port name map. + * + */ +bool TamRedisAdapter::init_rid_portname_map() +{ + SWSS_LOG_ENTER(); + vector vid_rid_values; + vector port_name_values; + + m_vid_rid_table.get("", vid_rid_values); + m_port_name_table.get("", port_name_values); + + if ((vid_rid_values.empty() == true) || + (port_name_values.empty() == true)) + { + SWSS_LOG_DEBUG("vid_rid_values size %d, port_name_values size %d", + vid_rid_values.size(), port_name_values.size()); + return false; + } + + /* Clear the MAP before inserting */ + m_rid_portname_map.clear(); + + for (auto fv: port_name_values) + { + /* Extract port name field. + * Port name field format - "Ethernet" + * Port name value format - "oid:0xAAAAAAAABBBBBBBB" + */ + string portStr = fvField(fv); + + if (portStr.empty()) + { + continue; + } + + /* Search through vid-rid table entries and get the rid + * corresponding to port name value(vid) and insert entry + * RID-Port map + */ + + for (auto fv1: vid_rid_values) + { + if (fvField(fv1) == fvValue(fv)) + { + string ridStr = fvValue(fv1); + if (!ridStr.empty()) + { + SWSS_LOG_DEBUG("Inserting entry in RID-Port with key %s and port %s", ridStr.c_str(), portStr.c_str()); + m_rid_portname_map.insert(pair(ridStr, portStr)); + break; + } + } + } + } + return true; +} + +/* Initialize RID (SAI object id) to queue num map. + * + */ +bool TamRedisAdapter::init_rid_queuenum_map() +{ + SWSS_LOG_ENTER(); + vector vid_rid_values; + vector queue_name_values; + vector queue_type_values; + + m_vid_rid_table.get("", vid_rid_values); + m_queue_name_table.get("", queue_name_values); + m_queue_type_table.get("", queue_type_values); + + if ((vid_rid_values.empty() == true) || + (queue_type_values.empty() == true) || + (queue_name_values.empty() == true)) + { + SWSS_LOG_DEBUG("vid_rid_values size %d, queue_name_values size %d", + vid_rid_values.size(), queue_name_values.size(), + queue_type_values.size()); + return false; + } + + /* Clear the MAP before inserting */ + m_rid_queuenum_map.clear(); + m_port_num_ucqueues_map.clear(); + + for (auto fv: queue_name_values) + { + /* Extract queue from queue name field. + * Queue name field format - "Ethernet:queue" + * Queue name value format - "oid:0xAAAAAAAABBBBBBBB" + */ + + int queue; + string queueStr = fvField(fv); + + /* Tokenize the string to extract the queue number. */ + vector fields = tokenize(queueStr, delimiter); + + /* fields[0] has "alias", fields[1] has the queue number. */ + if (!fields[1].empty()) + { + queue = stoi(fields[1]); + } + else + { + continue; + } + + /* Search through vid-rid table entries and get the rid + * corresponding to queue name value(vid) and insert entry + * RID-queue map + */ + + for (auto fv1: vid_rid_values) + { + if (fvField(fv1) == fvValue(fv)) + { + string ridStr = fvValue(fv1); + if (!ridStr.empty()) + { + SWSS_LOG_DEBUG("Inserting entry in RID-Queue with key %s and queue %d", ridStr.c_str(), queue); + m_rid_queuenum_map.insert(pair(ridStr, queue)); + break; + } + } + } + + /* Populate port-numUcQueues map. + * fields[0] has portname, fvValue(fv) has the queue OID. + * Find queue type and populate the map. + */ + if (fields[0].empty()) + { + continue; + } + + for (auto fv2: queue_type_values) + { + if (fvValue(fv) != fvField(fv2)) + { + /* Not the queue we are looking at. */ + continue; + } + + /* Check if queue is unicast. */ + if (fvValue(fv2) == "SAI_QUEUE_TYPE_UNICAST") + { + m_port_num_ucqueues_map[fields[0]]++; + break; + } + } + } + + return true; +} + +/* Initialize RID (SAI object id) to pg num map. + * + */ +bool TamRedisAdapter::init_rid_pgnum_map() +{ + SWSS_LOG_ENTER(); + vector vid_rid_values; + vector pg_name_values; + + m_vid_rid_table.get("", vid_rid_values); + m_pg_name_table.get("", pg_name_values); + + if ((vid_rid_values.empty() == true) || + (pg_name_values.empty() == true)) + { + SWSS_LOG_DEBUG("vid_rid_values size %d, pg_name_values size %d", + vid_rid_values.size(), pg_name_values.size()); + return false; + } + + /* Clear the MAP before inserting */ + m_rid_pgnum_map.clear(); + + for (auto fv: pg_name_values) + { + /* Extract queue from queue name field. + * PG name field format - "Ethernet:pg" + * PG name value format - "oid:0xAAAAAAAABBBBBBBB" + */ + + int pg; + string pgStr = fvField(fv); + + /* Tokenize the string to extract the queue number. */ + vector fields = tokenize(pgStr, delimiter); + + /* fields[0] has "alias", fields[1] has the queue number. */ + if (!fields[1].empty()) + { + pg = stoi(fields[1]); + } + else + { + continue; + } + + /* Search through vid-rid table entries and get the rid + * corresponding to priority group name value(vid) and + * insert entry RID-PG map + */ + + for (auto fv1: vid_rid_values) + { + if (fvField(fv1) == fvValue(fv)) + { + string ridStr = fvValue(fv1); + if (!ridStr.empty()) + { + SWSS_LOG_DEBUG("Inserting entry in RID-PG with key %s and pg %d", ridStr.c_str(), pg); + m_rid_pgnum_map.insert(pair(ridStr, pg)); + break; + } + } + } + } + return true; +} + +/* Initialize RID (SAI object id) to VID. + * + */ +bool TamRedisAdapter::init_rid_vid_map() +{ + SWSS_LOG_ENTER(); + vector vid_rid_values; + + m_vid_rid_table.get("", vid_rid_values); + + if (vid_rid_values.empty() == true) + { + SWSS_LOG_DEBUG("vid_rid_values size %d", + vid_rid_values.size()); + return false; + } + + /* Clear the MAP before inserting */ + m_rid_vid_map.clear(); + + /* Populate RID-VID map. */ + for (auto fv: vid_rid_values) + { + if (!fvField(fv).empty()) + { + SWSS_LOG_DEBUG("Inserting entry in RID-VID map with key %s and val %s", fvValue(fv).c_str(), fvField(fv).c_str()); + m_rid_vid_map.insert(pair(fvValue(fv), fvField(fv))); + } + } + + return true; +} + +/* Initialize buffer pool from VID to buffer pool name*/ +bool TamRedisAdapter::init_buffer_pool_map() +{ + SWSS_LOG_ENTER(); + vector pool_name_values; + + m_pool_name_table.get("", pool_name_values); + + if (pool_name_values.empty() == true) + { + SWSS_LOG_DEBUG("pool_name_values size %d", + pool_name_values.size()); + return false; + } + + /* Clear the MAP before inserting */ + m_pool_name_map.clear(); + + for (auto fv: pool_name_values) + { + if(!fvField(fv).empty()) + { + m_pool_name_map.insert(pair(fvField(fv), fvValue(fv))); + } + } + + return true; +} +/* Get the port name given an RID string. + * + */ +bool TamRedisAdapter::getPortNameFromRid(string rid, string *portName) +{ + SWSS_LOG_ENTER(); + ridPortNameMap::iterator it; + + it = m_rid_portname_map.find(rid); + if (it != m_rid_portname_map.end()) + { + *portName = it->second; + SWSS_LOG_DEBUG("Found port name %s corresponding to RID %s", portName->c_str(), rid.c_str()); + return true; + } + + SWSS_LOG_DEBUG("Port number corresponding to RID %s does not exist", rid.c_str()); + return false; +} + +/* Get the queue num given an RID string. + * + */ +bool TamRedisAdapter::getQueueNumFromRid(string rid, int *queueNum) +{ + SWSS_LOG_ENTER(); + ridQueueNumMap::iterator it; + + it = m_rid_queuenum_map.find(rid); + if (it != m_rid_queuenum_map.end()) + { + *queueNum = it->second; + SWSS_LOG_DEBUG("Found queue number %d corresponding to RID %s", *queueNum, rid.c_str()); + return true; + } + + SWSS_LOG_DEBUG("Queue number corresponding to RID %s does not exist", rid.c_str()); + return false; +} + +/* Get the priority-group num given an RID string. + * + */ +bool TamRedisAdapter::getPgNumFromRid(string rid, int *pgNum) +{ + SWSS_LOG_ENTER(); + ridPriorityGroupNumMap::iterator it; + + it = m_rid_pgnum_map.find(rid); + if (it != m_rid_pgnum_map.end()) + { + *pgNum = it->second; + SWSS_LOG_DEBUG("Found priority group %d corresponding to RID %s", *pgNum, rid.c_str()); + return true; + } + + SWSS_LOG_DEBUG("Priority group number corresponding to RID %s does not exist", rid.c_str()); + return false; +} + +/* Get the VID for an object RID. + * + */ +bool TamRedisAdapter::getVidFromRid(string rid, string *vid) +{ + SWSS_LOG_ENTER(); + ridVidMap::iterator it; + + it = m_rid_vid_map.find(rid); + if (it != m_rid_vid_map.end()) + { + *vid = it->second; + SWSS_LOG_DEBUG("Found VID %s corresponding to RID %s", vid->c_str(), rid.c_str()); + return true; + } + + SWSS_LOG_DEBUG("VID entry corresponding to RID %s not found.", rid.c_str()); + return false; +} + +/* Validate RID and see if we need to update maps. + * If so, update and re-attempt converting RID to VID. + */ +bool TamRedisAdapter::updateGetVidFromRid(string rid, string *vid) +{ + ridVidMap::iterator it; + + /* This API is called by caller when we couldn't find a mapping + * for the given RID. Validate if such an RID exists in the + * system. If it does, our maps may be stale. Refresh them. + */ + if (validateRID(rid) == true) + { + SWSS_LOG_NOTICE("Updating snapshot maps, RID %s found in system but not in local map.", rid.c_str()); + if (generateRedisOidMaps() != true) + { + SWSS_LOG_NOTICE("Unable to generate Redis maps, will re-attempt on next invocation."); + return false; + } + + /* Maps are re-generated. Attempt to find RID now in local map. */ + it = m_rid_vid_map.find(rid); + if (it != m_rid_vid_map.end()) + { + *vid = it->second; + SWSS_LOG_DEBUG("Found VID %s corresponding to RID %s", vid->c_str(), rid.c_str()); + return true; + } + + /* If we got here, there is an issue with our map creation technique. + * Log an error mentioning, a restart of snapshotmgr may be required to recover. + */ + SWSS_LOG_ERROR("Erroneous map generated on refresh, restart snapshotmgr to recover."); + } + else + { + SWSS_LOG_NOTICE("Received an erroneous RID in protobuf. No mapping for RID %s found on system.", rid.c_str()); + } + + SWSS_LOG_DEBUG("VID entry corresponding to RID %s not found.", rid.c_str()); + return false; +} + +/* Get the priority-group num given an RID string. + * + */ +bool TamRedisAdapter::getNumUcQueues(string if_name, int *numUcQueues) +{ + SWSS_LOG_ENTER(); + portNumUcQueuesMap::iterator it; + + it = m_port_num_ucqueues_map.find(if_name); + if (it != m_port_num_ucqueues_map.end()) + { + *numUcQueues = it->second; + SWSS_LOG_DEBUG("Found numUcQueues %d corresponding to port %s", *numUcQueues, if_name.c_str()); + return true; + } + + SWSS_LOG_DEBUG("numUcQueues for port %s does not exist", if_name.c_str()); + return false; +} + +/* Check if maps are generated. + * + */ +bool TamRedisAdapter::getMapsInitialized() +{ + return m_mapsInitialized; +} + +/* Check if maps are generated. + * + */ +bool TamRedisAdapter::getSystemMapReady() +{ + bool systemReady = false; + string mapDone = "false"; + + m_queue_map_table.hget("queuMapInitDone", "Done", mapDone); + + if(mapDone == "true") + { + systemReady = true; + } + + SWSS_LOG_DEBUG("System Ready is %d", systemReady); + return systemReady; +} + +/* Convert uint64_t sai object to an OID string + * for REDIS_DB format. + */ +string TamRedisAdapter::serializeOid(uint64_t oid) +{ + char buf[32]; + + snprintf(buf, sizeof(buf), "oid:0x%lx", oid); + + return buf; +} + +/* Get list of all PG OIDs(VID) in + * system. + */ +vector TamRedisAdapter::getPgOidList() +{ + vector pgList; + vector pg_name_values; + + m_pg_name_table.get("", pg_name_values); + + for (auto fv: pg_name_values) + { + /* fvValue(fv) returns the PG VID. + * Simply insert into pgList. + */ + pgList.push_back(fvValue(fv)); + } + + return pgList; +} + +/* Get list of all queue OIDs(VID) in + * system. + */ +vector TamRedisAdapter::getQueueOidList() +{ + vector queueList; + vector queue_name_values; + + m_queue_name_table.get("", queue_name_values); + + for (auto fv: queue_name_values) + { + /* fvValue(fv) returns the Queue VID. + * Simply insert into queueList. + */ + queueList.push_back(fvValue(fv)); + } + + return queueList; +} + +/* Get pool list OIDs(VID) in + * system. + */ +vector TamRedisAdapter::getPoolOidList() +{ + vector poolList; + vector pool_name_values; + + m_pool_name_table.get("", pool_name_values); + + for (auto fv: pool_name_values) + { + /* fvValue(fv) returns the Pool VID. + * Simply insert into poolList. + */ + poolList.push_back(fvValue(fv)); + } + + return poolList; +} + +/* Get pool name map in + * system. + */ +bool TamRedisAdapter::getBufferPoolName(string vid, string *buffer_name) +{ + SWSS_LOG_ENTER(); + bufferPoolNameMap::iterator it; + + it = m_pool_name_map.begin(); + for (;it != m_pool_name_map.end();it++) + { + if(it->second == vid) + { + *buffer_name = it->first; + return true; + } + } + return false; +} diff --git a/tam-buffers/tam_redis_adapter.h b/tam-buffers/tam_redis_adapter.h new file mode 100644 index 0000000..ed2b295 --- /dev/null +++ b/tam-buffers/tam_redis_adapter.h @@ -0,0 +1,145 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TAM_REDIS_ADAPTER_H +#define _TAM_REDIS_ADAPTER_H + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +const char delimiter = ':'; + +using namespace std; + +namespace swss { + +/* RID-Port Map: Key - RID, Data - Port name*/ +typedef map ridPortNameMap; + +/* RID-Queue Map: Key - RID, Data - Queue number */ +typedef map ridQueueNumMap; + +/* RID-PG Map: Key - RID, Data - priority group number */ +typedef map ridPriorityGroupNumMap; + +/* Portname-numUcQueues Map: Key - Port name, Data - numUcQueues */ +typedef map portNumUcQueuesMap; + +/* Cache the RIDTOVID table since this is referenced a lot for snapshot. */ +/* RID-VID Nap: Key - Object RID, Data - Object VID */ +typedef map ridVidMap; + +typedef map bufferPoolNameMap; + +class TamRedisAdapter +{ +public: + TamRedisAdapter(DBConnector *appDb, DBConnector *stateDb, DBConnector *counterDb, DBConnector *asicDb); + ~TamRedisAdapter() + { + } + + /* API to get port name corresponding to RID */ + bool getPortNameFromRid(string rid, string *portName); + + /* API to get queue number corresponding to RID */ + bool getQueueNumFromRid(string rid, int *queueNum); + + /* API to get priority group corresponding to RID */ + bool getPgNumFromRid(string rid, int *pgNum); + + /* API to get VID for RID of an object. */ + bool getVidFromRid(string rid, string *vid); + + /* API to validate RID and update maps if needed. */ + bool updateGetVidFromRid(string rid, string *vid); + + /* API to get buffer pool name from VID. */ + bool getBufferPoolName(string vid, string *buffer_name); + + /* API to get numUcQueues corresponding to portName */ + bool getNumUcQueues(string port, int *numUcQueues); + + /* API to generate PG, queue and port OID maps */ + bool generateRedisOidMaps(); + + /* API to find if maps are initialized. Return boolean. */ + bool getMapsInitialized(); + + /* API to find if all system maps are ready. */ + bool getSystemMapReady(); + + /* API to get list of all PGs. */ + vector getPgOidList(); + + /* API to get list of all queues. */ + vector getQueueOidList(); + + /* API to get list of all pools. */ + vector getPoolOidList(); + + /* API to generate the OID string from uint64_t SAI object ID */ + string serializeOid(uint64_t oid); + +private: + Table m_queue_name_table; /* Table to store queue name entries */ + Table m_queue_type_table; /* Table to store queue type entries */ + Table m_queue_port_table; /* Table to store queue port entries */ + Table m_queue_map_table; /* Table to store queue map entries */ + Table m_port_name_table; /* Table to store port name entries */ + Table m_pg_name_table; /* Table to store priority group name entries */ + Table m_vid_rid_table; /* Table to store VID RID entries */ + Table m_pool_name_table; /* Table to store buffer pool entries */ + + ridPortNameMap m_rid_portname_map; /* RID-Port Map */ + ridQueueNumMap m_rid_queuenum_map; /* RID-Queue Map */ + ridPriorityGroupNumMap m_rid_pgnum_map; /* RID- PG Map */ + ridVidMap m_rid_vid_map; /* RID-VID Map */ + bufferPoolNameMap m_pool_name_map; /* VID-POOL name map*/ + portNumUcQueuesMap m_port_num_ucqueues_map; /* Port-NumUcQueues Map */ + bool m_mapsInitialized; /* Are RID maps initialized. */ + DBConnector *m_asicDb; /* ASIC DB Connector */ + + /* Initializes RID-Port map */ + bool init_rid_portname_map(); + + /* Initializes RID-Queue map */ + bool init_rid_queuenum_map(); + + /* Initializes RID-PG map */ + bool init_rid_pgnum_map(); + + /* Initializes RID-VID map */ + bool init_rid_vid_map(); + + /* Initializes VID-POOL name map */ + bool init_buffer_pool_map(); + + /* Validate RID from DB */ + bool validateRID(string rid); +}; + +} + +#endif /* _TAM_REDIS_ADAPTER_H */ diff --git a/tam-buffers/tam_socket_interface.cpp b/tam-buffers/tam_socket_interface.cpp new file mode 100644 index 0000000..0d17946 --- /dev/null +++ b/tam-buffers/tam_socket_interface.cpp @@ -0,0 +1,133 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include "tam_socket_interface.h" + +using namespace std; + +TamSocketInterface::TamSocketInterface(uint16_t port, uint32_t saddr) : + m_bufSize(TAM_MAX_MSG_LEN), + m_messageBuffer(NULL), + m_recv_timeout(TAM_SOCKET_TIMEOUT) +{ + struct sockaddr_in addr; + int val = 1; + + /* Create a UDP socket. */ + m_tam_socket = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (m_tam_socket < 0) + throw system_error(errno, system_category()); + + if (setsockopt(m_tam_socket, SOL_SOCKET, SO_REUSEADDR, &val, + sizeof(val)) < 0) + { + close(m_tam_socket); + throw system_error(errno, system_category()); + } + + /* Bind socket to server address. */ + memset (&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(saddr); + + if (bind(m_tam_socket, (struct sockaddr *)&addr, sizeof(addr)) < 0) + { + close(m_tam_socket); + throw system_error(errno, system_category()); + } + + m_tam_socket_up = true; + m_messageBuffer = new char[m_bufSize]; +} + +TamSocketInterface::~TamSocketInterface() +{ + m_tam_socket_up = false; + + /* Close the socket and return. */ + delete m_messageBuffer; + close(m_tam_socket); +} + +int TamSocketInterface::getFd() +{ + return m_tam_socket; +} + +/* Setup select to timeout if data is not + * available on socket after m_recv_timeout. + * Return false on timeout or select error. + */ +bool TamSocketInterface::fdSelect() +{ + struct timeval timeout; + fd_set readfds; + int maxFd; + + /* Use select to timeout after pre-configured timeout. */ + timeout.tv_sec = m_recv_timeout; + timeout.tv_usec = 0; + FD_ZERO(&readfds); + FD_SET(m_tam_socket, &readfds); + maxFd = m_tam_socket + 1; + + /* Wait on data. */ + if (select(maxFd, &readfds, NULL, + NULL, &timeout) < 0) + { + return false; + } + + if (!FD_ISSET(m_tam_socket, &readfds)) + { + return false; + } + + return true; +} + +/* Receive processing thread. Read data from socket. + * Returns size of data read. + */ +ssize_t TamSocketInterface::readData() +{ + socklen_t len; + ssize_t dataRead; + struct sockaddr_in cliaddr; + + /* Clean up buffer. */ + memset(m_messageBuffer, 0, m_bufSize); + + /* At this point, data should be available on socket. + * Read the data. + */ + len = sizeof(cliaddr); + dataRead = recvfrom(m_tam_socket, m_messageBuffer, m_bufSize, + MSG_WAITALL, (struct sockaddr *)&cliaddr, &len); + if (dataRead <= 0) + { + SWSS_LOG_ERROR("recvfrom returned %d on the protobuf socket.", dataRead); + return -1; + } + + return dataRead; +} diff --git a/tam-buffers/tam_socket_interface.h b/tam-buffers/tam_socket_interface.h new file mode 100644 index 0000000..7364551 --- /dev/null +++ b/tam-buffers/tam_socket_interface.h @@ -0,0 +1,95 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _TAM_SOCKET_INTERFACE_H +#define _TAM_SOCKET_INTERFACE__H + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +/* Maximum size of proto buffer */ +#define TAM_MAX_MSG_LEN 50000 +#define TAM_SOCKET_TIMEOUT 3 /* 3 seconds */ + +class TamSocketInterface +{ +public: + TamSocketInterface(uint16_t port, uint32_t saddr); + ~TamSocketInterface(); + + /* Raw data arriving from socket. */ + char *m_messageBuffer; + + /* Flag to check if socket is up. */ + bool m_tam_socket_up; + + /* Read data from socket and save in buffer; */ + bool fdSelect(); + + /* Get socket descriptor. */ + int getFd(); + + /* Read from socket. */ + ssize_t readData(); + +private: + unsigned int m_bufSize; + int m_tam_socket; + int m_recv_timeout; + +}; + +class SelectableFd : + public swss::Selectable +{ + public: + SelectableFd(int fd) + { + SWSS_LOG_ENTER(); + + m_fd = fd; + } + + int getFd() override + { + SWSS_LOG_ENTER(); + + return m_fd; + } + + uint64_t readData() override + { + SWSS_LOG_ENTER(); + + // empty + return 0; + } + + private: + + int m_fd; +}; + +#endif /* _TAM_SOCKET_INTERFACE_H */ diff --git a/thresholdmgr/Makefile.am b/thresholdmgr/Makefile.am new file mode 100644 index 0000000..6d50ce8 --- /dev/null +++ b/thresholdmgr/Makefile.am @@ -0,0 +1,20 @@ +INCLUDES = -I $(top_srcdir) -I $(top_srcdir)/tam-buffers -I $(top_srcdir)/proto +CFLAGS_SWSS = -I /usr/include/swss + +bin_PROGRAMS = thresholdmgr test_proto + +if DEBUG +DBGFLAGS = -ggdb -DDEBUG +else +DBGFLAGS = -g +endif + +thresholdmgr_SOURCES = thresholdmgr_main.cpp thresholdmgr.cpp $(top_srcdir)/proto/sai_tam_buffer_stats.pb.cc $(top_srcdir)/proto/sai_tam_event_threshold_breach.pb.cc $(top_srcdir)/proto/sai_tam_main.pb.cc $(top_srcdir)/proto/sai_tam_event.pb.cc $(top_srcdir)/proto/sai_tam_event_flow_learning.pb.cc +thresholdmgr_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SWSS) $(COV_CFLAGS) $(ASAN_CFLAGS) +thresholdmgr_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SWSS) $(COV_CFLAGS) $(ASAN_CFLAGS) +thresholdmgr_LDADD = -lprotobuf -lswsscommon -lnl-3 -lnl-route-3 -lhiredis $(top_srcdir)/tam-buffers/libtambuffers.a $(COV_LDFLAGS) $(ASAN_LDFLAGS) + +test_proto_SOURCES = test_proto.cpp $(top_srcdir)/proto/sai_tam_buffer_stats.pb.cc $(top_srcdir)/proto/sai_tam_event_threshold_breach.pb.cc $(top_srcdir)/proto/sai_tam_main.pb.cc $(top_srcdir)/proto/sai_tam_event.pb.cc $(top_srcdir)/proto/sai_tam_event_flow_learning.pb.cc +test_proto_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SWSS) +test_proto_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SWSS) +test_proto_LDADD = -lprotobuf diff --git a/thresholdmgr/test_proto.cpp b/thresholdmgr/test_proto.cpp new file mode 100644 index 0000000..414d2d1 --- /dev/null +++ b/thresholdmgr/test_proto.cpp @@ -0,0 +1,248 @@ +#include +#include +#include +#include + +#include +#include +#include + +#include "sai_tam_event.pb.h" +#include "sai_tam_buffer_stats.pb.h" +#include "sai_tam_event_threshold_breach.pb.h" + +#define TEST_PROTO_ARG_TYPE 1 +#define TEST_PROTO_ARG_PRID 2 +#define TEST_PROTO_ARG_PGQRID 3 +#define TEST_PROTO_ARG_COUNT 4 +#define TEST_PROTO_ARG_STATP 5 +#define TEST_PROTO_ARG_STATB 6 +#define TEST_PROTO_ARG_STATERRTYPE 7 + +using namespace std; + +/* Command line arguments + * + * Mandatory arguments: + * argv[1] - type - posible values are "pgshared, pgheadroom, qunicast, qmulticast" + * argv[2] - port_rid - RID of port (long int) + * argv[3] - pg/q rid - RID of pg/queue (long int) + * argv[4] - count - Number of times protobuf is to be sent + * + * Optional arguments: + * argv[5] - statpercent - Stat value in percentage (int) (Bufferstats proto is sent) + * argv[6] - stat - Stat value in bytes (long int) (Bufferstats proto is also sent) + * argv[7] - staterrtype - Type of erroneous buffer stat proto to send. Possible + * values are "porterr, buffererr" + */ +int main (int argc, char* argv[]) +{ + Event msg; + ThresholdBreach *event = msg.mutable_threshold_event(); + ThresholdSource *source = event->mutable_breach_source(); + struct sockaddr_in servaddr, cliaddr; + int sockfd; + int size, i =10, statp; + uint64_t port_oid = 0, statb; + uint64_t buffer_oid = 0; + char buffer[5000]; + InterfaceBufferStatistics *intfstats; + BufferStatistics *bufstats; + string errstat; + + //Check args, should be atleast 5 + if (argc < 5) + { + cout << "Invalid argument list passed."; + } + + for (i = 0; i < argc; i++) + { + cout<set_type(THRESHOLD_BREACH_AT_IPG); + if (btype == "pgshared") + { + source->set_ipg_type(IPG_SHARED); + } + else + { + source->set_ipg_type(IPG_XOFF); + } + source->set_ipg_oid(buffer_oid); + } + else if ((btype == "qunicast") || + (btype == "qmulticast")) + { + source->set_type(THRESHOLD_BREACH_AT_QUEUE); + if (btype == "qunicast") + { + source->set_queue_type(QUEUE_UNICAST); + } + else + { + source->set_queue_type(QUEUE_MULTICAST); + } + source->set_queue_oid(buffer_oid); + } + else + { + cout << "Invalid breach type argument provided."; + cout << "Valid values are pgshared, pgheadroom, qunicast, qmulticast"; + } + + /* Get port oid and set it. */ + port_oid = stol(argv[TEST_PROTO_ARG_PRID]); + /* Fill up the event data */ + source->set_port_oid(port_oid); + if (argc > 5) + { + /* Buffer stats to be added */ + statp = stoi(argv[TEST_PROTO_ARG_STATP]); + statb = stol(argv[TEST_PROTO_ARG_STATB]); + + if (argc == 8) + { + /* errneous report to be sent */ + errstat = argv[TEST_PROTO_ARG_STATERRTYPE]; + } + + intfstats = event->mutable_buffer_stats()->add_intf_buffer_stats(); + if (errstat == "porterr") + { + /* Send erroneous port oid. */ + intfstats->set_port_oid(0x000001000); + } + else + { + intfstats->set_port_oid(port_oid); + } + + if (btype == "pgshared" || btype == "pgheadroom") + { + /* PG stats */ + IPGStatistics *ipgstats = intfstats->add_ipg_stats(); + + if (errstat == "buffererr") + { + /* Send erroneous pg oid. */ + ipgstats->set_ipg_oid(0x0000002); + } + else + { + ipgstats->set_ipg_oid(buffer_oid); + } + + if (btype == "pgshared") + { + ipgstats->set_ipg_type(IPG_SHARED); + } + else + { + ipgstats->set_ipg_type(IPG_XOFF); + } + /* Set stat value. */ + bufstats = ipgstats->mutable_stats(); + bufstats->set_peak_buffer_occupancy_bytes(statb); + bufstats->set_peak_buffer_occupancy_percent(statp); + } + else + { + /* Queue buffer stats. */ + /* queue stats */ + QueueStatistics *queuestats = intfstats->add_queue_stats(); + + if (errstat == "buffererr") + { + /* Send erroneous pg oid. */ + queuestats->set_queue_oid(0x0000002); + } + else + { + queuestats->set_queue_oid(buffer_oid); + } + + if (btype == "qunicast") + { + queuestats->set_queue_type(QUEUE_UNICAST); + } + else + { + queuestats->set_queue_type(QUEUE_MULTICAST); + } + + /* Set stat value. */ + bufstats = queuestats->mutable_stats(); + bufstats->set_peak_buffer_occupancy_bytes(statb); + bufstats->set_peak_buffer_occupancy_percent(statp); + } + } + + cout <DebugString(); + + // Creating socket file descriptor + if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { + cout << "socket creation failed"; + return -1; + } + + memset(&servaddr, 0, sizeof(servaddr)); + memset(&cliaddr, 0, sizeof(cliaddr)); + + // Filling server information + servaddr.sin_family = AF_INET; // IPv4 + servaddr.sin_addr.s_addr = INADDR_ANY; + servaddr.sin_port = htons(9072); + + //Client info + cliaddr.sin_family = AF_INET; // IPv4 + cliaddr.sin_addr.s_addr = INADDR_ANY; + cliaddr.sin_port = htons(9171); + + + // Bind the socket with the server address + if ( bind(sockfd, (const struct sockaddr *)&servaddr, + sizeof(servaddr)) < 0 ) + { + cout <<"bind failed"; + return -1; + } + + /* Set timestamp */ + msg.set_timestamp(std::time(nullptr)); + size = msg.ByteSize(); + + /* Serialize protobuf to array */ + msg.SerializeToArray(buffer, size); + + i = stoi(argv[TEST_PROTO_ARG_COUNT]); + + /* Setup a socket and send */ + while (i-- != 0) + { + sendto(sockfd, (char *)buffer, size, MSG_CONFIRM, + (const struct sockaddr *)&cliaddr, sizeof(cliaddr)); + +// cout <<"Protobuf message sent. %d" << size; +// sleep (1); + } + + cout << i; + + /* Clean-up */ + google::protobuf::ShutdownProtobufLibrary(); + + return 0; +} diff --git a/thresholdmgr/thresholdmgr.cpp b/thresholdmgr/thresholdmgr.cpp new file mode 100644 index 0000000..5bdd8de --- /dev/null +++ b/thresholdmgr/thresholdmgr.cpp @@ -0,0 +1,806 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "thresholdmgr.h" + +ThresholdMgr::ThresholdMgr(DBConnector *appDb, DBConnector *stateDb, + DBConnector *counterDb, DBConnector *asicDb) : + m_counterTable(counterDb, COUNTERS_THRESHOLD_BREACH_TABLE), + m_counterStatTable(new RedisPipeline(counterDb, 16000), COUNTERS_TABLE, true), + m_redisAdapter(appDb, stateDb, counterDb, asicDb) +{ + uint16_t port = THRESHOLD_MGR_BREACH_EVENT_PORT; + uint32_t addr = INADDR_LOOPBACK; + + /* Initialize the socket interface. */ + m_protoInterface = unique_ptr + (new TamSocketInterface(port, addr)); + + /* Setup plugins to be run on COUNTERS_DB. */ + string pgLuaScript = loadLuaScript(PG_WATERMARK_PLUGIN_NAME); + string queueLuaScript = loadLuaScript(QUEUE_WATERMARK_PLUGIN_NAME); + string poolLuaScript = loadLuaScript(BUFFER_POOL_WATERMARK_PLUGIN_NAME); + + m_pgWmSha = loadRedisScript(counterDb, pgLuaScript); + m_queueWmSha = loadRedisScript(counterDb, queueLuaScript); + m_poolWmSha = loadRedisScript(counterDb, poolLuaScript); + m_runAllPlugins = false; + +} + +int ThresholdMgr::generateBreachEventIndex() +{ + int lastEventid = 0; + string key ("event-id"); + vector fvs; + + /* Read COUNTERS_DB THRESHOLD_BREACH_ENTRY + * for getting the last written event-id. + */ + m_counterTable.get(key, fvs); + if (fvs.empty() != true) + { + /* Pick last event-id and start from there. */ + /* We have only 1 entry in this table. */ + for (auto fv: fvs) + { + if (fvField(fv) == "id") + { + lastEventid = stoi(fvValue(fv)); + } + } + } + + return lastEventid; +} + + +bool ThresholdMgr::getBufferStat(SwitchBufferStats bufferStats, uint64_t if_oid, + string buffer_type, uint64_t buffer_type_oid, + uint64_t *stat, uint64_t *statPercent) +{ + TamProtoBufferStatsDecoder stats(&bufferStats); + statData data = std::make_pair(0, 0); + + /* Check if buffer stats proto was decoded successfully. */ + if (stats.getProtoDecoded() != true) + { + SWSS_LOG_ERROR("Unable to parse/decode buffer stats proto report."); + return false; + } + + /* Retrieve stats according to the buffer */ + if ((buffer_type == "shared") || (buffer_type == "headroom")) + { + /* Get priority-group stats. */ + bufferStatsPg pgStats; + IPGType type; + + if (buffer_type == "shared") + { + type = IPG_SHARED; + } + else + { + type = IPG_XOFF; + } + + pgStats = stats.getPriorityGroupBufferStats(); + pgType key = std::make_pair(buffer_type_oid, type); + bufferStatsPg::iterator it; + + /* Find the entry. If not found, return false. */ + it = pgStats.find(key); + if (it == pgStats.end()) + { + SWSS_LOG_ERROR("PG 0x%x not found in buffer stats report", buffer_type_oid); + return false; + } + data = it->second; + } + else if ((buffer_type == "unicast" || buffer_type == "multicast")) + { + /* Get queue stats. */ + bufferStatsQueue queueStats; + QueueType type; + + if (buffer_type == "unicast") + { + type = QUEUE_UNICAST; + } + else + { + type = QUEUE_MULTICAST; + } + + queueStats = stats.getQueueBufferStats(); + queueType key = std::make_pair(buffer_type_oid, type); + bufferStatsQueue::iterator it; + + /* Find the entry. If not found, return false. */ + it = queueStats.find(key); + if (it == queueStats.end()) + { + SWSS_LOG_ERROR("Queue 0x%x not found in buffer stats report", buffer_type_oid); + return false; + } + data = it->second; + } + else if ((buffer_type == "ingress") || (buffer_type == "egress")) + { + /* Get queue stats. */ + bufferStatsGlobalPool poolStats; + BufferPoolType type; + + if (buffer_type == "ingress") + { + type = BUFFERPOOL_INGRESS; + } + else + { + type = BUFFERPOOL_EGRESS; + } + + poolStats = stats.getBufferPoolBufferStats(); + poolType key = std::make_pair(buffer_type_oid, type); + bufferStatsGlobalPool::iterator it; + + /* Find the entry. If not found, return false. */ + it = poolStats.find(key); + if (it == poolStats.end()) + { + SWSS_LOG_ERROR("Pool 0x%x not found in buffer stats report", buffer_type_oid); + return false; + } + data = it->second; + } + + *stat = data.first; + *statPercent = data.second; + + return true; +} + +bool ThresholdMgr::processThresholdBreachData(char *protoBuffer, int len) +{ + ThresholdBreach thresBreachEvent; + ThresholdSource breachSource; + SwitchBufferStats bufferStats; + Event event; + string if_name, timestamp, oid_string; + uint64_t if_oid = 0, buffer_type_oid = 0; + uint64_t stat = 0, statPercent = 0; + time_t time; + string buffer, buffer_type; + int index = 0; + char tmp[64]; + + if (protoBuffer == NULL) + { + return false; + } + + /* Convert the message buffer to protobuf message. + * Serialize APIs convert the message buffer to + * protobuf format. m_messageBuffer has the array. + * len needs to be the exact size of protobuf data sent. + * Errors could be seen if len is not accurate. + */ + if (event.ParseFromArray(protoBuffer, len) != true) + { + SWSS_LOG_ERROR( "Unable to parse buffer to event proto format."); + return false; + } + + /* Get the timestamp and format it. */ + memset(tmp, 0, sizeof(tmp)); + time = static_cast (event.timestamp()); + size_t size = strftime(tmp, 32 ,"%Y-%m-%d.%T", localtime(&time)); + if (size != 0) + { + timestamp = string(tmp); + } + else + { + SWSS_LOG_DEBUG("Generating software timestamp, proto timestamp is invalid."); + /* Generate SW timestamp. */ + timestamp = getTimestamp(); + } + + thresBreachEvent = event.threshold_event(); + + SWSS_LOG_DEBUG( "Successfully decoded threshold proto of length %d", len); + SWSS_LOG_DEBUG("%s", thresBreachEvent.DebugString().c_str()); + + breachSource = thresBreachEvent.breach_source(); + + + /* Check if threshold breach proto has buffer stats. */ + if (thresBreachEvent.has_buffer_stats()) + { + /* Decode buffer statistics. */ + bufferStats = thresBreachEvent.buffer_stats(); + } + + /* Check if system map is ready. */ + if (m_redisAdapter.getSystemMapReady() != true) + { + SWSS_LOG_NOTICE("System Maps are not ready. Ignore processing protobufs."); + return false; + } + + /* Check if port, queue and pg RID maps are generated. + * Generate them if not generated yet. + */ + if (m_redisAdapter.getMapsInitialized() != true) + { + if (m_redisAdapter.generateRedisOidMaps() != true) + { + SWSS_LOG_ERROR("Aborting protobuf processing, couldn't generate RID maps"); + return false; + } + } + vector buffer_pool = m_redisAdapter.getPoolOidList(); + + /* thresBreachEvent now has the breach event data. */ + /* Get port */ + if (breachSource.has_port_oid() == true) + { + if_oid = breachSource.port_oid(); + oid_string = m_redisAdapter.serializeOid(if_oid); + if (m_redisAdapter.getPortNameFromRid(oid_string, &if_name) + != true) + { + SWSS_LOG_ERROR( "Invalid proto message port oid %lld.", if_oid); + return false; + } + } + + /* Get breach type. */ + switch (breachSource.type()) + { + case ThresholdBreachSourceType::THRESHOLD_BREACH_AT_QUEUE: + buffer = "queue"; + /* UCQ/MCQ/CPUQ trigger */ + if (breachSource.has_queue_type() == true) + { + if (breachSource.queue_type() == QueueType::QUEUE_UNICAST) + { + buffer_type = "unicast"; + } + else if (breachSource.queue_type() == QueueType::QUEUE_MULTICAST) + { + buffer_type = "multicast"; + } + } + + /* queue number - index */ + if (breachSource.has_queue_oid() == true) + { + buffer_type_oid = breachSource.queue_oid(); + oid_string = m_redisAdapter.serializeOid(buffer_type_oid); + if (m_redisAdapter.getQueueNumFromRid(oid_string, &index) + != true) + { + SWSS_LOG_ERROR( "Invalid proto message queue oid %lld.", buffer_type_oid); + return false; + } + } + + /* For multicast queues, we show relative index. + * mc index = index - numUcQueues. + */ + if (breachSource.queue_type() == QueueType::QUEUE_MULTICAST) + { + if(if_name != "CPU") + { + int numUcQueues; + if (m_redisAdapter.getNumUcQueues(if_name, &numUcQueues) != true) + { + SWSS_LOG_ERROR("Unable to get numUcQueues for port %s", if_name.c_str()); + return false; + } + + SWSS_LOG_DEBUG("numUcQueues %d for port %s", numUcQueues, if_name.c_str()); + index = index - numUcQueues; + } + } + + + break; + + case ThresholdBreachSourceType::THRESHOLD_BREACH_AT_IPG: + /* priority-group trigger */ + buffer = "priority-group"; + + switch (breachSource.ipg_type()) + { + case IPGType::IPG_SHARED: + buffer_type = "shared"; + break; + + case IPGType::IPG_XOFF: + buffer_type = "headroom"; + break; + + default: + break; + } + + /* PG index */ + if (breachSource.has_ipg_oid() == true) + { + buffer_type_oid = breachSource.ipg_oid(); + oid_string = m_redisAdapter.serializeOid(buffer_type_oid); + if (m_redisAdapter.getPgNumFromRid(oid_string, &index) + != true) + { + SWSS_LOG_ERROR( "Invalid proto message pg oid %lld.", buffer_type_oid); + return false; + } + } + break; + + case ThresholdBreachSourceType::THRESHOLD_BREACH_AT_GLOBAL_POOLS: + switch (breachSource.pool_type()) + { + case BufferPoolType::BUFFERPOOL_INGRESS: + buffer_type = "ingress"; + break; + + case BufferPoolType::BUFFERPOOL_EGRESS: + buffer_type = "egress"; + break; + + default: + break; + } + + /* pool oid */ + if (breachSource.has_pool_oid() == true) + { + buffer_type_oid = breachSource.pool_oid(); + oid_string = m_redisAdapter.serializeOid(buffer_type_oid); + string vid; + + if (m_redisAdapter.getVidFromRid(oid_string, &vid) + != true) + { + if(!(buffer_pool.empty())) + { + SWSS_LOG_ERROR( "Invalid buffer pool OID %lld in report, no corresponding VID found", buffer_type_oid); + return false; + } + } + /*Get buffer pool name for corresponding VID*/ + if(m_redisAdapter.getBufferPoolName(vid, &buffer) != true) + { + SWSS_LOG_ERROR( "Invalid buffer pool VID %lld in report, no corresponding Buffer pool name found", buffer_type_oid); + return false; + } + } + + default: + break; + } + + if (thresBreachEvent.has_buffer_stats()) + { + /* Get the buffer statistics for breach source. */ + if (getBufferStat(bufferStats, if_oid, buffer_type, buffer_type_oid, + &stat, &statPercent) != true) + { + SWSS_LOG_ERROR("Invalid buffer stats report received for breach event %s oid:%lld", + thresSaiCounter.at(buffer_type).c_str(), buffer_type_oid); + return false; + } + } + + + SWSS_LOG_DEBUG("Successfully processed breach event protobuf."); + SWSS_LOG_DEBUG("Breach event buffer: %s, type: %s, port %s.", buffer.c_str(), buffer_type.c_str(), if_name.c_str()); + SWSS_LOG_DEBUG("index: %d, breach_value: %d, %s: %lld, time-stamp %s.", index, statPercent, + thresSaiCounter.at(buffer_type).c_str(), stat, timestamp.c_str()); + + /* Create the threshold breach table entry + * with the data from protobuf + * processing and write it to COUNTER_DB. + */ + /* Create the THRESHOLD_BREACH table entry. */ + int eventid = generateBreachEventIndex(); + string key = string("breach-report") + COUNTER_DB_TABLE_DELIMITER + + to_string(eventid); + vector fvs; + + /* Populate the THRESHOLD_BREACH entry. */ + fvs.emplace_back("buffer", buffer); + fvs.emplace_back("type", buffer_type); + fvs.emplace_back("port", if_name); + fvs.emplace_back("index", to_string(index)); + fvs.emplace_back("breach_value", to_string(statPercent)); + fvs.emplace_back(thresSaiCounter.at(buffer_type), to_string(stat)); + fvs.emplace_back("time-stamp", timestamp); + + /* Write THRESHOLD_BREACH entry to COUNTERS_DB. */ + m_counterTable.set(key, fvs); + + /* Populate the next event-id to be used into COUNTERS_DB. */ + string eventkey ("event-id"); + vector eventfvs; + + /* Write next breach event id to use in case of warmboot, + * docker restart etc. into COUNTERS_DB. + */ + eventfvs.emplace_back("id", to_string(++eventid)); + m_counterTable.set(eventkey, eventfvs); + + return true; +} + +void ThresholdMgr::runAllPlugins() +{ + vector pgList = m_redisAdapter.getPgOidList(); + vector queueList = m_redisAdapter.getQueueOidList(); + vector poolList = m_redisAdapter.getPoolOidList(); + runPlugins(poolList, pgList, queueList); +} + +void ThresholdMgr::runPlugins(vector poolList, vector pgList, vector queueList) +{ + swss::DBConnector db("COUNTERS_DB", 0); + + /* Lua script expects argv to be set. + * Most of this data is not used. + * Only argv[0] is used by the lua script + */ + const vector argv = + { + to_string(COUNTERS_DB), + COUNTERS_TABLE, + to_string(1 * 1000) + }; + + /* Run the Lua plugins for pool, pg and queue counters. */ + /* Alternatively, run the plugin for all PGs, queues and pools present in COUNTERS_DB MAPS. */ + + /* Pool plugin */ + runRedisScript(db, m_poolWmSha, poolList, argv); + + /* PG plugin */ + runRedisScript(db, m_pgWmSha, pgList, argv); + + /* Queue plugin */ + runRedisScript(db, m_queueWmSha, queueList, argv); +} + +bool ThresholdMgr::processThresholdStatsData(char *protoBuffer, int len) +{ + ThresholdBreach thresBreachEvent; + SwitchBufferStats bufferStats; + Event event; + string timestamp; + char tmp[64]; + statData data = std::make_pair(0, 0); + vector pgList; + vector queueList; + vector poolList; + time_t time; + + if (protoBuffer == NULL) + { + return false; + } + + /* Convert the message buffer to protobuf message. + * Serialize APIs convert the message buffer to + * protobuf format. m_messageBuffer has the array. + * len needs to be the exact size of protobuf data sent. + * Errors could be seen if len is not accurate. + */ + if (event.ParseFromArray(protoBuffer, len) != true) + { + SWSS_LOG_ERROR( "Unable to parse buffer to event proto format."); + return false; + } + + thresBreachEvent = event.threshold_event(); + + /* Check if threshold breach proto has buffer stats. */ + if (!(thresBreachEvent.has_buffer_stats())) + { + SWSS_LOG_INFO( "No Stats Present."); + return false; + } + /* Decode buffer statistics. */ + bufferStats = thresBreachEvent.buffer_stats(); + + /* Populate timestamp of report. */ + time = static_cast (event.timestamp()); + + if (time != 0) + { + size_t size = strftime(tmp, 32 ,"%Y-%m-%d.%T", localtime(&time)); + + if (size != 0) + { + timestamp = string(tmp); + } + } + else + { + SWSS_LOG_DEBUG("Generating software timestamp, proto timestamp is invalid."); + /* Generate SW timestamp. */ + timestamp = getTimestamp(); + } + + + /* Decode the buffer stats protobuf. */ + TamProtoBufferStatsDecoder protoDecoder(&bufferStats); + + if (protoDecoder.getProtoDecoded() != true) + { + SWSS_LOG_ERROR("Unable to decode buffer stats proto."); + return false; + } + + /* Check if system map is ready. */ + if (m_redisAdapter.getSystemMapReady() != true) + { + SWSS_LOG_NOTICE("System Maps are not ready. Ignore processing protobufs."); + return false; + } + + /* Check if port, queue and pg RID maps are generated. + * Generate them if not generated yet. + */ + if (m_redisAdapter.getMapsInitialized() != true) + { + if (m_redisAdapter.generateRedisOidMaps() != true) + { + SWSS_LOG_ERROR("Aborting protobuf processing, couldn't generate RID maps"); + return false; + } + } + + /* Buffer stats event proto was parsed and decoded successfully. Get the stats and write to COUNTERS_DB. */ + SWSS_LOG_INFO("Proto received %s", event.DebugString().c_str()); + + vector buffer_pool = m_redisAdapter.getPoolOidList(); + + /* Get and process all global buffer pool stats. */ + bufferStatsGlobalPool poolStats = protoDecoder.getBufferPoolBufferStats(); + bufferStatsGlobalPool::iterator it; + + for (it = poolStats.begin(); it != poolStats.end(); it++) + { + /* Get the entry. Convert the buffer pool + * RID to VID and populate COUNTERS_DB. + */ + poolType pool = it->first; + string rid, vid, counterName; + vector values; + + data = it->second; + /* Serialize RID (uint64_t) */ + rid = m_redisAdapter.serializeOid(pool.first); + /* Convert RID to VID */ + if (m_redisAdapter.getVidFromRid(rid, &vid) != true) + { + /*When VID is received in stats and it is not present in non empty buffer pool table then throw error*/ + if(!(buffer_pool.empty())) + { + SWSS_LOG_ERROR( "Invalid buffer pool OID %lld in report, no corresponding VID found", pool.first); + } + + continue; + } + + /* Update counters. Check if buffer pool type matters here. + * We are only interested in stat in bytes right now. + */ + counterName = "SAI_BUFFER_POOL_STAT_WATERMARK_BYTES"; + values.emplace_back(counterName, to_string(data.first)); + m_counterStatTable.set(vid, values); + + /* Update counter percent. */ + counterName = "SAI_BUFFER_POOL_PERCENT_STAT_WATERMARK"; + values.emplace_back(counterName, to_string(data.second)); + m_counterStatTable.set(vid, values); + + /* Update timestamp in DB. */ + counterName = "SAI_BUFFER_POOL_STAT_TIMESTAMP"; + values.emplace_back(counterName, timestamp); + m_counterStatTable.set(vid, values); + + /* Add key to pool list to run plugin on. */ + poolList.push_back(vid); + } + + /* Decode PG stats and write to COUNTERS_DB */ + bufferStatsPg pgStats = protoDecoder.getPriorityGroupBufferStats(); + bufferStatsPg::iterator pgit; + + /* Loop through all PGs and populate the counters. */ + for (pgit = pgStats.begin(); pgit != pgStats.end(); pgit++) + { + /* Get the entry. Convert PG + * RID to VID and populate COUNTERS_DB. + */ + pgType priorityGroup = pgit->first; + data = pgit->second; + + string rid, vid, counterName; + vector values; + + /* Serialize RID (uint64_t) */ + rid = m_redisAdapter.serializeOid(priorityGroup.first); + + /* Convert RID to VID */ + if (m_redisAdapter.getVidFromRid(rid, &vid) != true) + { + SWSS_LOG_ERROR("Invalid priority-group OID %lld in report, no corresponding VID found", priorityGroup.first); + continue; + } + + /* Update counters. */ + if (priorityGroup.second == IPG_SHARED) + { + counterName = "SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES"; + } + else + { + counterName = "SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES"; + } + + values.emplace_back(counterName, to_string(data.first)); + + /* Update counter percent. */ + if (priorityGroup.second == IPG_SHARED) + { + counterName = "SAI_INGRESS_PRIORITY_GROUP_PERCENT_STAT_SHARED_WATERMARK"; + } + else + { + counterName = "SAI_INGRESS_PRIORITY_GROUP_PERCENT_STAT_XOFF_ROOM_WATERMARK"; + } + + values.emplace_back(counterName, to_string(data.second)); + + /* Update timestamp in DB. */ + counterName = "SAI_PG_STAT_TIMESTAMP"; + + values.emplace_back(counterName, timestamp); + m_counterStatTable.set(vid, values); + + + /* Add key to pg list to run plugin on. */ + pgList.push_back(vid); + } + + /* Decode queue stats and populate in COUNTERS_DB. */ + bufferStatsQueue queueStats = protoDecoder.getQueueBufferStats(); + bufferStatsQueue::iterator qit; + + /* Loop through all queues and populate the counters. */ + for (qit = queueStats.begin(); qit != queueStats.end(); qit++) + { + /* Get the entry. Convert Queue + * RID to VID and populate COUNTERS_DB. + */ + queueType queue = qit->first; + data = qit->second; + + string rid, vid, counterName; + vector values; + + /* Serialize RID (uint64_t) */ + rid = m_redisAdapter.serializeOid(queue.first); + + /* Convert RID to VID */ + if (m_redisAdapter.getVidFromRid(rid, &vid) != true) + { + SWSS_LOG_ERROR("Invalid queue OID %lld in report, no corresponding VID found", queue.first); + continue; + } + + /* Update counters. */ + counterName = "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES"; + + values.emplace_back(counterName, to_string(data.first)); + + /* Update counter percent. */ + counterName = "SAI_QUEUE_PERCENT_STAT_SHARED_WATERMARK"; + + values.emplace_back(counterName, to_string(data.second)); + + /* Update timestamp in DB. */ + counterName = "SAI_QUEUE_STAT_TIMESTAMP"; + + values.emplace_back(counterName, timestamp); + m_counterStatTable.set(vid, values); + + /* Add key to queue list to run plugin on. */ + queueList.push_back(vid); + } + + + m_counterStatTable.flush(); + + /* For the first report, run through all lists for initialising WM tables. */ + if (m_runAllPlugins == false) + { + runAllPlugins(); + m_runAllPlugins = true; + } + else + { + /* Run plugins to generate watermark table data. */ + runPlugins(poolList, pgList, queueList); + } + return true; +} + + +bool ThresholdMgr::readProcessProtobuf() +{ + int protoDataRead = 0; + + /* Read the protobuf data from the protobuf + * socket interface and process the protobuf. + */ + protoDataRead = (int)m_protoInterface->readData(); + if (protoDataRead <= 0) + { + SWSS_LOG_ERROR("Error while receiving threshold protobuf data."); + return false; + } + + SWSS_LOG_DEBUG("Protobuf message received of length %d", protoDataRead); + + /* Read data is saved in protoInterface->m_messageBuffer. + * Extract data into protobuf and process it. + */ + if (processThresholdStatsData(m_protoInterface->m_messageBuffer, protoDataRead) + != true ) + { + SWSS_LOG_ERROR("Error while processing threshold protobuf stats data."); + return false; + } + + + if (processThresholdBreachData(m_protoInterface->m_messageBuffer, protoDataRead) + != true ) + { + SWSS_LOG_ERROR("Error while processing threshold protobuf data."); + return false; + } + + return true; +} + +int ThresholdMgr::getFd() +{ + return m_protoInterface->getFd(); +} + +void ThresholdMgr::resetPlugins() +{ + m_runAllPlugins = false; +} diff --git a/thresholdmgr/thresholdmgr.h b/thresholdmgr/thresholdmgr.h new file mode 100644 index 0000000..70468ec --- /dev/null +++ b/thresholdmgr/thresholdmgr.h @@ -0,0 +1,109 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _THRESHOLDMGR_H_ +#define _THRESHOLDMGR_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "redisapi.h" +#include "tam_redis_adapter.h" +#include "tam_socket_interface.h" +#include "tam_proto_buffer_stats_decoder.h" +#include "sai_tam_event.pb.h" +#include "sai_tam_buffer_stats.pb.h" +#include "sai_tam_event_threshold_breach.pb.h" + +#define COUNTER_DB_TABLE_DELIMITER ":" +#define THRESHOLD_MGR_BREACH_EVENT_PORT 9171 +/* Address to loopback in software to local host. */ +#define ADDR_LOOPBACK 0x7f000001 /* 127.0.0.1 */ + +#define PG_WATERMARK_PLUGIN_NAME "watermark_pg.lua" +#define QUEUE_WATERMARK_PLUGIN_NAME "watermark_queue.lua" +#define BUFFER_POOL_WATERMARK_PLUGIN_NAME "watermark_pool.lua" + + + +static const std::map thresSaiCounter = +{ + { "shared", "SAI_INGRESS_PRIORITY_GROUP_STAT_SHARED_WATERMARK_BYTES" }, + { "headroom", "SAI_INGRESS_PRIORITY_GROUP_STAT_XOFF_ROOM_WATERMARK_BYTES" }, + { "unicast", "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES" }, + { "multicast", "SAI_QUEUE_STAT_SHARED_WATERMARK_BYTES" }, + { "ingress", "SAI_BUFFER_POOL_STAT_WATERMARK_BYTES" }, + { "egress", "SAI_BUFFER_POOL_STAT_WATERMARK_BYTES" } + +}; + +using namespace swss; +using namespace std; + +class ThresholdMgr +{ +public: + ThresholdMgr(DBConnector *appDb, DBConnector *stateDb, DBConnector *counterDb, DBConnector *asicDb); + ~ThresholdMgr() + { + } + + /* Routine to read and process the threshold protobuf data. */ + bool readProcessProtobuf(); + TamRedisAdapter m_redisAdapter; + int getFd(); + void resetPlugins(); + +private: + unique_ptr m_protoInterface; + Table m_counterTable; + Table m_counterStatTable; + string m_pgWmSha; + string m_queueWmSha; + string m_poolWmSha; + bool m_runAllPlugins; + + /* Get the buffer statistics from the buffer stats proto class */ + bool getBufferStat(SwitchBufferStats bufferStats, uint64_t if_oid, + string buffer_type, uint64_t buffer_type_oid, + uint64_t *stat, uint64_t *statPercent); + + /* Generate event-id from DB. */ + int generateBreachEventIndex(); + + /* Process the threshold breach protobuf data. */ + bool processThresholdBreachData(char *protoBuffer, int len); + + /* Process the threshold breach protobuf data and write stats to Db. */ + bool processThresholdStatsData(char *protoBuffer, int len); + + bool initializeCounters(string timestamp); + + void runPlugins(vector poolList, vector pgList, vector queueList); + + /* Run lua plugins for all PG's, queue's and buffer pools. */ + void runAllPlugins(); + +}; + +#endif // _THRESHOLDMGR_H_ diff --git a/thresholdmgr/thresholdmgr_main.cpp b/thresholdmgr/thresholdmgr_main.cpp new file mode 100644 index 0000000..4c47d4d --- /dev/null +++ b/thresholdmgr/thresholdmgr_main.cpp @@ -0,0 +1,90 @@ +/* + * Copyright 2019 Broadcom Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include "thresholdmgr.h" + +bool sonic_tam_threshold_port_queue_map_init(ThresholdMgr *thresholdmgr); +int main(int argc, char **argv) +{ + swss::Logger::linkToDbNative("thresholdmgr"); + map queue_map; + + try + { + SWSS_LOG_NOTICE("-----Starting ThresholdMgr-----"); + + swss::DBConnector appDb("APPL_DB", 0); + swss::DBConnector stateDb("STATE_DB", 0); + swss::DBConnector counterDb("COUNTERS_DB", 0); + swss::DBConnector asicDb("ASIC_DB", 0); + + /* Initialize the threshold manager. */ + ThresholdMgr *thresholdMgr = new ThresholdMgr(&appDb, &stateDb, &counterDb, &asicDb); + Select s; + SubscriberStateTable mapInit(&counterDb, COUNTERS_QUEUE_MAP); + s.addSelectable(&mapInit); + + int sock_fd = thresholdMgr->getFd(); + SelectableFd fd(sock_fd); + s.addSelectable(&fd); + + while (true) + { + Selectable *sel = NULL; + s.select(&sel); + + if (sel == (Selectable *)&mapInit) + { + std::deque entries; + mapInit.pops(entries); + + for (auto entry: entries) + { + string key = kfvKey(entry); + queue_map[key] = entry; + } + /* When changes occur in qMapInitDone + * get port and queue maps and update + * system maps with recent mappings + * */ + SWSS_LOG_DEBUG("DPB Event occured calling Qmap reinit in Thresholdmgr."); + thresholdMgr->m_redisAdapter.generateRedisOidMaps(); + thresholdMgr->resetPlugins(); + } + + if (sel == (Selectable *)&fd) + { + /* ThresholdMgr listens to protobuf data, + * processes the protobuf data and writes + * it to COUNTER_DB. + */ + if (thresholdMgr->readProcessProtobuf() != true) + { + SWSS_LOG_DEBUG("Unable to read and process threshold protobuf data."); + continue; + } + } + } + } + catch (const exception &e) + { + SWSS_LOG_ERROR("Runtime error: %s", e.what()); + } + + return -1; +} +