Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lib/mgmt_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
ms->ins = stream_new(max_msg_sz);
stream_fifo_init(&ms->inq);
stream_fifo_init(&ms->outq);
ms->max_read_buf = max_write_buf;
ms->max_write_buf = max_read_buf;
ms->max_read_buf = max_read_buf;
ms->max_write_buf = max_write_buf;
ms->max_msg_sz = max_msg_sz;
ms->idtag = strdup(idtag);
}
Expand All @@ -473,8 +473,8 @@ void mgmt_msg_destroy(struct mgmt_msg_state *ms)
*/

#define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250
#define MSG_CONN_SEND_BUF_SIZE (1u << 16)
#define MSG_CONN_RECV_BUF_SIZE (1u << 16)
#define MSG_CONN_SEND_BUF_SIZE (1u << 17)
#define MSG_CONN_RECV_BUF_SIZE (1u << 17)

static void msg_client_sched_connect(struct msg_client *client,
unsigned long msec);
Expand Down Expand Up @@ -706,8 +706,8 @@ static int msg_client_connect_short_circuit(struct msg_client *client)
/* client side */
client->conn.fd = sockets[0];
set_nonblocking(sockets[0]);
setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf);
setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf);
setsockopt_so_sendbuf(sockets[0], MSG_CONN_SEND_BUF_SIZE);
setsockopt_so_recvbuf(sockets[0], MSG_CONN_RECV_BUF_SIZE);

/* server side */
memset(&su, 0, sizeof(union sockunion));
Expand Down
44 changes: 43 additions & 1 deletion tests/topotests/lib/fe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# noqa: E501
#
"""A MGMTD front-end client."""

import argparse
import logging
import os
Expand Down Expand Up @@ -150,6 +151,30 @@
MSG_FORMAT_LYB = 3


def resstr(result_type):
if result_type == MSG_FORMAT_XML:
return "XML"
if result_type == MSG_FORMAT_JSON:
return "JSON"
if result_type == MSG_FORMAT_LYB:
return "LYB"
return f"unknown({result_type})"


def opstr(op):
if op == NOTIFY_OP_NOTIFICATION:
return "YANG-NOTIFICAITON"
if op == NOTIFY_OP_PATCH:
return "PATCH"
if op == NOTIFY_OP_REPLACE:
return "REPLACE"
if op == NOTIFY_OP_DELETE:
return "DELETE"
if op == NOTIFY_OP_GET_SYNC:
return "SYNC"
return f"unknown({op})"


def cstr(mdata):
"""Convert a null-term byte array into a string, excluding the null terminator."""
assert mdata[-1] == 0
Expand Down Expand Up @@ -442,6 +467,10 @@ def __parse_args():
parser.add_argument(
"--datastore", action="store_true", help="listen for datastore notifications"
)
parser.add_argument(
"--log",
help="file to log to instead of stderr",
)
parser.add_argument(
"-q", "--query", nargs="+", metavar="XPATH", help="xpath[s] to query"
)
Expand All @@ -450,7 +479,12 @@ def __parse_args():
args = parser.parse_args()

level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s: %(message)s")
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s: %(message)s",
filename=args.log,
filemode="w" if args.log is not None else None,
)

return args

Expand Down Expand Up @@ -497,6 +531,13 @@ def __main():
sess.add_notify_select(True, args.listen)
while i > 0 or args.notify_count == 0:
result_type, op, xpath, notif = sess.recv_notify()
logging.debug(
"Notified: op: %s xpath: %s data(%s): %s",
opstr(op),
xpath,
resstr(result_type),
notif,
)
if op == NOTIFY_OP_NOTIFICATION:
if args.datastore:
logging.warning("ignoring non-datastore notification: %s", notif)
Expand Down Expand Up @@ -524,6 +565,7 @@ def __main():
else:
logging.error("Unknown notification OP: %s", op)
sys.exit(1)
sys.stdout.flush()
i -= 1


Expand Down
101 changes: 44 additions & 57 deletions tests/topotests/mgmt_notif/test_ds_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import logging
import os
import re
import time

import pytest
from lib.common_config import step
Expand All @@ -31,6 +30,14 @@
BE_CLIENT = "/usr/lib/frr/mgmtd_testc"


def have_beclient(r1):
if hasattr(have_beclient, "has"):
return have_beclient.has
rc, _, _ = r1.cmd_status(BE_CLIENT + " --help")
have_beclient.has = True if not rc else False
return have_beclient.has


@pytest.fixture(scope="module")
def tgen(request):
"Setup/Teardown the environment and provide tgen argument to tests"
Expand Down Expand Up @@ -103,6 +110,14 @@ def wait_op_json(f, op, path, json_match=None, exact=False, timeout=30):
logging.debug("no json match: %s: continue", jo)


@retry(retry_timeout=30, initial_wait=0.5)
def check_backend_xpath_registry(r1, repath):
output = r1.cmd_raises('vtysh -c "show mgmt backend-yang-xpath-registry notify"')
if re.match("notify: ?" + repath, output):
return "missing notify registration for " + repath
return None


def test_frontend_datastore_notification(tgen):
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
Expand All @@ -115,32 +130,25 @@ def test_frontend_datastore_notification(tgen):
p = r1.popen(
[
FE_CLIENT,
"--verbose",
"--log=fe_client.log",
"--datastore",
"--notify-count=2",
"--notify-count=0",
"--listen=/frr-interface:lib/interface/state",
]
)
assert waitline(p.stderr, "Connected", timeout=30)

r1.cmd_raises("ip link set r1-eth0 mtu 1200")

# {"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"if-index":2,"mtu":1200,"mtu6":1200,"speed":10000,"metric":0,"phy-address":"ba:fd:de:b5:8b:90"}}]}}

try:
# Wait for FE client to exit
output, error = p.communicate(timeout=30)
notifs = get_op_and_json(output)
op, path, data = notifs[1]

assert op == "REPLACE"
assert path.startswith("/frr-interface:lib/interface[name='r1-eth0']/state")
wait_op_json(p.stdout, "SYNC", "/frr-interface:lib/interface/state", None)

jsout = json.loads(data)
expected = json.loads(
'{"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"mtu":1200}}]}}'
r1.cmd_raises("ip link set r1-eth0 mtu 1200")
wait_op_json(
p.stdout,
"REPLACE",
"/frr-interface:lib/interface[name='r1-eth0']/state/mtu",
'{"frr-interface:lib":{"interface":[{"name":"r1-eth0","state":{"mtu":1200}}]}}',
)
result = json_cmp(jsout, expected)
assert result is None

finally:
p.kill()
r1.cmd_raises("ip link set r1-eth0 mtu 1500")
Expand All @@ -154,13 +162,9 @@ def test_backend_datastore_update(tgen):

check_kernel_32(r1, "11.11.11.11", 1, "")

rc, _, _ = r1.cmd_status(BE_CLIENT + " --help")
if rc:
if not have_beclient(r1):
pytest.skip("No mgmtd_testc")

# Watch the mgmtd log for the BE subscribing
mlogp = r1.popen(["/usr/bin/tail", "-n0", "-f", f"{r1.rundir}/mgmtd.log"])

# Start our BE client in the background
p = r1.popen(
[
Expand All @@ -172,8 +176,6 @@ def test_backend_datastore_update(tgen):
"/frr-interface:lib/interface",
]
)
assert waitline(mlogp.stdout, 'now known as "mgmtd-testc"', timeout=30)
mlogp.kill()

try:
wait_op_json(p.stdout, "SYNC", "/frr-interface:lib/interface", None)
Expand All @@ -198,13 +200,9 @@ def test_backend_datastore_add_delete(tgen):

check_kernel_32(r1, "11.11.11.11", 1, "")

rc, _, _ = r1.cmd_status(BE_CLIENT + " --help")
if rc:
if not have_beclient(r1):
pytest.skip("No mgmtd_testc")

# Watch the mgmtd log for the BE subscribing
mlogp = r1.popen(["/usr/bin/tail", "-n0", "-f", f"{r1.rundir}/mgmtd.log"])

# Start our BE client in the background
p = r1.popen(
[
Expand All @@ -217,24 +215,22 @@ def test_backend_datastore_add_delete(tgen):
"/frr-vrf:lib/vrf",
]
)
assert waitline(mlogp.stdout, 'now known as "mgmtd-testc"', timeout=30)
mlogp.kill()

def _check_backend_xpath_registry():
output = r1.cmd_raises(
'vtysh -c "show mgmt backend-yang-xpath-registry notify"'
)
if re.match("notify: /frr-vrf:lib/vrf:.*mgmtd-testc.*", output):
return "missing mgmtd-testc notify registration for /frr-vrf:lib/vrf"
return None

_, result = topotest.run_and_expect(
_check_backend_xpath_registry, None, count=20, wait=1
)
assert result is None, result
check_backend_xpath_registry(r1, r"/frr-vrf:lib/vrf:.*mgmtd-testc.*")

r1.cmd_raises('vtysh -c "conf t" -c "int foobar"')
try:
#
# If have a failure here b/c we are now notifying at the list element
# level (instead of `.../state`) we have either 1) solved the multiple
# owner of sub-tree data problem, or 2) someone has made this change not
# understanding the problem (i.e., zebra deletes the list element but
# some other backend daemon owns state under the interface and hasn't
# deleted it yet).
#
# When we do have a comprehensive solution (1) for the multiple owner
# issue we should update this test to be more stringent and look for the
# list element to be deleted.
#
assert waitline(
p.stdout,
re.escape('#OP=REPLACE: /frr-interface:lib/interface[name="foobar"]/state'),
Expand Down Expand Up @@ -272,13 +268,9 @@ def test_backend_datastore_router_id(tgen):

check_kernel_32(r1, "11.11.11.11", 1, "")

rc, _, _ = r1.cmd_status(BE_CLIENT + " --help")
if rc:
if not have_beclient(r1):
pytest.skip("No mgmtd_testc")

# Watch the mgmtd log for the BE subscribing
mlogp = r1.popen(["/usr/bin/tail", "-n0", "-f", f"{r1.rundir}/mgmtd.log"])

# Start our BE client in the background
p = r1.popen(
[
Expand All @@ -292,8 +284,6 @@ def test_backend_datastore_router_id(tgen):
"/frr-vrf:lib/vrf/frr-zebra:zebra/ipv6-router-id",
]
)
assert waitline(mlogp.stdout, 'now known as "mgmtd-testc"', timeout=30)
mlogp.kill()

js4_init = '{"frr-vrf:lib":{"vrf":[{"name":"default","frr-zebra:zebra":{"router-id":"1.1.1.1"}}]}}'
js4_chg = '{"frr-vrf:lib":{"vrf":[{"name":"default","frr-zebra:zebra":{"router-id":"1.2.3.4"}}]}}'
Expand Down Expand Up @@ -376,8 +366,7 @@ def test_datastore_backend_filters(tgen):

r1 = tgen.gears["r1"].net

rc, _, _ = r1.cmd_status(BE_CLIENT + " --help")
if rc:
if not have_beclient(r1):
pytest.skip("No mgmtd_testc")

check_kernel_32(r1, "11.11.11.11", 1, "")
Expand All @@ -400,7 +389,6 @@ def test_datastore_backend_filters(tgen):
check_filters(r1, ["/frr-interface:lib/interface/state"])

step("Start BE client and verify both notify selectors present in zebra")
mlogp = r1.popen(["/usr/bin/tail", "-n0", "-f", f"{r1.rundir}/mgmtd.log"])
p2 = r1.popen(
[
BE_CLIENT,
Expand All @@ -411,7 +399,6 @@ def test_datastore_backend_filters(tgen):
"/frr-interface:lib/interface",
]
)
assert waitline(mlogp.stdout, 'now known as "mgmtd-testc"', timeout=30)
selectors = [
"/frr-interface:lib/interface",
"/frr-interface:lib/interface/state",
Expand Down
Loading