Skip to content

Commit 2e1890b

Browse files
committed
rx,tx: use one node per port queue
Instead of dealing with a list of queues to poll packets from, instantiate one port_rx node clone for each configured port RX queue. Do the same thing for port_tx and each configured port TX queue. Depending on worker RX queue mapping, include the associated port_rx-p*q* clones and initialize their context data accordingly. Always include *all* port_tx-p*q* clones in all graphs, regardless of worker TX queue mappings. Instead, configure the port_output node context to steer packets to the proper port_tx-p*q* clone. All of this requires delicate juggling with multiple node names lists to ensure that nodes are cloned when needed before they are affected to a graph. Secondly, we need to call rte_node_free() on those which are not used in any graph after all of them have been reloaded. Signed-off-by: Robin Jarry <[email protected]>
1 parent f9095a9 commit 2e1890b

13 files changed

Lines changed: 750 additions & 615 deletions

File tree

docs/graph.svg

Lines changed: 378 additions & 366 deletions
Loading

modules/infra/api/stats.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ static gr_vec struct gr_infra_stat *graph_stats(uint16_t cpu_id) {
5757
memccpy(stat.name, name, 0, sizeof(stat.name));
5858
gr_vec_add(stats, stat);
5959
}
60-
if (strcmp(name, "port_rx") == 0 || strcmp(name, "control_input") == 0)
60+
if (strncmp(name, "port_rx-", strlen("port_rx-")) == 0
61+
|| strcmp(name, "control_input") == 0)
6162
pkts += n->packets;
6263
node_cycles += n->cycles;
6364
}

modules/infra/control/graph.c

Lines changed: 195 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <gr_port.h>
1212
#include <gr_queue.h>
1313
#include <gr_rxtx.h>
14+
#include <gr_string.h>
1415
#include <gr_vec.h>
1516
#include <gr_worker.h>
1617

@@ -25,7 +26,12 @@
2526
#include <unistd.h>
2627

2728
struct node_infos node_infos = STAILQ_HEAD_INITIALIZER(node_infos);
28-
static gr_vec const char **node_names;
29+
static gr_vec const char **base_node_names;
30+
static gr_vec char **rx_node_names;
31+
static gr_vec char **tx_node_names;
32+
static rte_node_t port_rx_node;
33+
static rte_node_t port_tx_node;
34+
static rte_node_t port_output_node;
2935

3036
rte_edge_t gr_node_attach_parent(const char *parent, const char *node) {
3137
rte_node_t parent_id;
@@ -68,15 +74,17 @@ void worker_graph_free(struct worker *worker) {
6874
}
6975
}
7076

71-
static int worker_graph_new(struct worker *worker, uint8_t index) {
72-
struct rx_node_queues *rx = NULL;
73-
struct tx_node_queues *tx = NULL;
74-
char name[RTE_GRAPH_NAMESIZE];
77+
static int
78+
worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_port **ports) {
79+
gr_vec const char **graph_nodes = NULL;
80+
char graph_name[RTE_GRAPH_NAMESIZE];
81+
char node_name[RTE_NODE_NAMESIZE];
82+
gr_vec char **rx_nodes = NULL;
7583
struct queue_map *qmap;
84+
struct rte_node *node;
7685
uint16_t graph_uid;
7786
unsigned n_rxqs;
78-
size_t len;
79-
int ret;
87+
int ret = 0;
8088

8189
n_rxqs = 0;
8290
gr_vec_foreach_ref (qmap, worker->rxqs) {
@@ -90,16 +98,9 @@ static int worker_graph_new(struct worker *worker, uint8_t index) {
9098

9199
// unique suffix for this graph
92100
graph_uid = (worker->cpu_id << 1) | (0x1 & index);
93-
snprintf(name, sizeof(name), "gr-%04x", graph_uid);
101+
snprintf(graph_name, sizeof(graph_name), "gr-%04x", graph_uid);
94102

95-
// build rx & tx nodes data
96-
len = sizeof(*rx) + n_rxqs * sizeof(struct rx_port_queue);
97-
rx = rte_malloc(__func__, len, RTE_CACHE_LINE_SIZE);
98-
if (rx == NULL) {
99-
ret = -ENOMEM;
100-
goto err;
101-
}
102-
rx->n_queues = 0;
103+
// generate graph nodes list
103104
gr_vec_foreach_ref (qmap, worker->rxqs) {
104105
if (!qmap->enabled)
105106
continue;
@@ -108,19 +109,61 @@ static int worker_graph_new(struct worker *worker, uint8_t index) {
108109
worker->cpu_id,
109110
qmap->port_id,
110111
qmap->queue_id);
111-
rx->queues[rx->n_queues].port_id = qmap->port_id;
112-
rx->queues[rx->n_queues].rxq_id = qmap->queue_id;
113-
rx->n_queues++;
112+
113+
char *name = astrcat(NULL, RX_NODE_FMT, qmap->port_id, qmap->queue_id);
114+
gr_vec_add(rx_nodes, name);
115+
gr_vec_add(graph_nodes, name);
114116
}
115-
rx->burst_size = RTE_GRAPH_BURST_SIZE / rx->n_queues;
116117

117-
tx = rte_malloc(__func__, sizeof(*tx), RTE_CACHE_LINE_SIZE);
118-
if (tx == NULL) {
119-
ret = -ENOMEM;
120-
goto err;
118+
gr_vec_extend(graph_nodes, base_node_names);
119+
gr_vec_extend(graph_nodes, tx_node_names);
120+
121+
// graph init
122+
struct rte_graph_param params = {
123+
.socket_id = rte_lcore_to_socket_id(worker->lcore_id),
124+
.nb_node_patterns = gr_vec_len(graph_nodes),
125+
.node_patterns = graph_nodes,
126+
};
127+
if (rte_graph_create(graph_name, &params) == RTE_GRAPH_ID_INVALID) {
128+
if (rte_errno == 0)
129+
rte_errno = EINVAL;
130+
ret = -rte_errno;
131+
goto out;
121132
}
122-
// initialize all to invalid queue_ids
123-
memset(tx, 0xff, sizeof(*tx));
133+
worker->graph[index] = rte_graph_lookup(graph_name);
134+
135+
// set rx nodes context data
136+
gr_vec_foreach_ref (qmap, worker->rxqs) {
137+
if (!qmap->enabled)
138+
continue;
139+
snprintf(node_name, sizeof(node_name), RX_NODE_FMT, qmap->port_id, qmap->queue_id);
140+
node = rte_graph_node_get_by_name(graph_name, node_name);
141+
struct rx_node_ctx *ctx = (struct rx_node_ctx *)node->ctx;
142+
gr_vec_foreach (struct iface_info_port *p, ports) {
143+
if (p->port_id == qmap->port_id) {
144+
ctx->iface = RTE_PTR_SUB(p, offsetof(struct iface, info));
145+
break;
146+
}
147+
}
148+
assert(ctx->iface != NULL);
149+
ctx->rxq.port_id = qmap->port_id;
150+
ctx->rxq.queue_id = qmap->queue_id;
151+
ctx->burst_size = RTE_GRAPH_BURST_SIZE / gr_vec_len(worker->rxqs);
152+
}
153+
154+
// initialize all tx nodes context to invalid ports and queues
155+
gr_vec_foreach (const char *name, tx_node_names) {
156+
node = rte_graph_node_get_by_name(graph_name, name);
157+
struct port_queue *ctx = (struct port_queue *)node->ctx;
158+
ctx->port_id = UINT16_MAX;
159+
ctx->queue_id = UINT16_MAX;
160+
}
161+
162+
// initialize the port_output node context to point to invalid edges
163+
struct port_output_edges *out = rte_malloc(__func__, sizeof(*out), RTE_CACHE_LINE_SIZE);
164+
for (unsigned i = 0; i < ARRAY_DIM(out->edges); i++)
165+
out->edges[i] = RTE_EDGE_ID_INVALID;
166+
124167
gr_vec_foreach_ref (qmap, worker->txqs) {
125168
if (!qmap->enabled)
126169
continue;
@@ -129,38 +172,39 @@ static int worker_graph_new(struct worker *worker, uint8_t index) {
129172
worker->cpu_id,
130173
qmap->port_id,
131174
qmap->queue_id);
132-
tx->txq_ids[qmap->port_id] = qmap->queue_id;
175+
// find the corresponding port_tx clone for this port-txq pair
176+
snprintf(node_name, sizeof(node_name), TX_NODE_FMT, qmap->port_id, qmap->queue_id);
177+
node = rte_graph_node_get_by_name(graph_name, node_name);
178+
// and update its context data to correct values
179+
struct port_queue *ctx = (struct port_queue *)node->ctx;
180+
ctx->port_id = qmap->port_id;
181+
ctx->queue_id = qmap->queue_id;
182+
183+
for (rte_edge_t edge = 0; edge < gr_vec_len(tx_node_names); edge++) {
184+
if (strcmp(tx_node_names[edge], node_name) == 0) {
185+
// update the port_output context data to map this port to the
186+
// correct edge
187+
out->edges[ctx->port_id] = edge;
188+
break;
189+
}
190+
}
133191
}
134192

135-
// graph init
136-
struct rte_graph_param params = {
137-
.socket_id = rte_lcore_to_socket_id(worker->lcore_id),
138-
.nb_node_patterns = gr_vec_len(node_names),
139-
.node_patterns = (const char **)node_names,
140-
};
141-
if (rte_graph_create(name, &params) == RTE_GRAPH_ID_INVALID) {
142-
if (rte_errno == 0)
143-
rte_errno = EINVAL;
144-
ret = -rte_errno;
145-
goto err;
146-
}
147-
worker->graph[index] = rte_graph_lookup(name);
193+
// finally, set the port_output context data
194+
rte_graph_node_get_by_name(graph_name, "port_output")->ctx_ptr = out;
148195

149-
rte_graph_node_get_by_name(name, "port_rx")->ctx_ptr = rx;
150-
rte_graph_node_get_by_name(name, "port_tx")->ctx_ptr = tx;
196+
out:
197+
gr_vec_free(graph_nodes);
198+
gr_strvec_free(rx_nodes);
151199

152-
return 0;
153-
err:
154-
rte_free(rx);
155-
rte_free(tx);
156200
return errno_set(-ret);
157201
}
158202

159-
int worker_graph_reload(struct worker *worker) {
203+
int worker_graph_reload(struct worker *worker, gr_vec struct iface_info_port **ports) {
160204
unsigned next = !atomic_load(&worker->cur_config);
161205
int ret;
162206

163-
if ((ret = worker_graph_new(worker, next)) < 0)
207+
if ((ret = worker_graph_new(worker, next, ports)) < 0)
164208
return errno_log(-ret, "worker_graph_new");
165209

166210
// wait for datapath worker to pickup the config update
@@ -181,15 +225,95 @@ int worker_graph_reload(struct worker *worker) {
181225
return 0;
182226
}
183227

184-
int worker_graph_reload_all(void) {
228+
static char *ensure_queue_node(
229+
rte_node_t base_node,
230+
const char *name_fmt,
231+
uint16_t port_id,
232+
uint16_t queue_id,
233+
gr_vec char **old_list
234+
) {
235+
char *name = astrcat(NULL, name_fmt, port_id, queue_id);
236+
assert(name != NULL);
237+
238+
// remove node from the old list so that only unused nodes remain
239+
for (unsigned i = 0; i < gr_vec_len(old_list); i++) {
240+
if (strcmp(old_list[i], name) == 0) {
241+
free(old_list[i]);
242+
gr_vec_del(old_list, i);
243+
break;
244+
}
245+
}
246+
if (rte_node_from_name(name) == RTE_NODE_ID_INVALID) {
247+
// node does not exist yet, clone it from the base
248+
rte_node_t node = rte_node_clone(base_node, strstr(name, "-") + 1);
249+
assert(node != RTE_NODE_ID_INVALID);
250+
}
251+
252+
return name;
253+
}
254+
255+
static gr_vec rte_node_t *worker_graph_nodes_add_missing(gr_vec struct iface_info_port **ports) {
256+
gr_vec char **old_rx = gr_vec_clone(rx_node_names);
257+
gr_vec char **old_tx = gr_vec_clone(tx_node_names);
258+
rte_node_t *unused_nodes = NULL;
259+
rte_edge_t edge;
260+
char *name;
261+
262+
gr_vec_free(rx_node_names);
263+
gr_vec_free(tx_node_names);
264+
265+
// clone port_rx and port_tx to match all possible port-queue pairs
266+
gr_vec_foreach (struct iface_info_port *port, ports) {
267+
for (uint16_t rxq = 0; rxq < port->n_rxq; rxq++) {
268+
name = ensure_queue_node(
269+
port_rx_node, RX_NODE_FMT, port->port_id, rxq, old_rx
270+
);
271+
gr_vec_add(rx_node_names, name);
272+
}
273+
for (uint16_t txq = 0; txq < port->n_txq; txq++) {
274+
name = ensure_queue_node(
275+
port_tx_node, TX_NODE_FMT, port->port_id, txq, old_tx
276+
);
277+
gr_vec_add(tx_node_names, name);
278+
}
279+
}
280+
281+
// update the port_output edges to allow reaching all possible port_tx clones
282+
edge = rte_node_edge_update(
283+
port_output_node, 0, (const char **)tx_node_names, gr_vec_len(tx_node_names)
284+
);
285+
assert(edge != RTE_EDGE_ID_INVALID);
286+
edge = rte_node_edge_shrink(port_output_node, gr_vec_len(tx_node_names));
287+
assert(edge != RTE_EDGE_ID_INVALID);
288+
289+
// store all unused node_ids in a list to be returned to the caller
290+
gr_vec_foreach (name, old_rx)
291+
gr_vec_add(unused_nodes, rte_node_from_name(name));
292+
gr_vec_foreach (name, old_tx)
293+
gr_vec_add(unused_nodes, rte_node_from_name(name));
294+
gr_strvec_free(old_rx);
295+
gr_strvec_free(old_tx);
296+
297+
return unused_nodes;
298+
}
299+
300+
int worker_graph_reload_all(gr_vec struct iface_info_port **ports) {
185301
struct worker *worker;
186302
int ret;
187303

304+
gr_vec rte_node_t *unused_nodes = worker_graph_nodes_add_missing(ports);
305+
188306
STAILQ_FOREACH (worker, &workers, next) {
189-
if ((ret = worker_graph_reload(worker)) < 0)
307+
if ((ret = worker_graph_reload(worker, ports)) < 0)
190308
return ret;
191309
}
192310

311+
// these port_rx and port_tx clones are now not referenced in any graph
312+
// we can safely delete them
313+
gr_vec_foreach (rte_node_t node, unused_nodes)
314+
rte_node_free(node);
315+
gr_vec_free(unused_nodes);
316+
193317
return 0;
194318
}
195319

@@ -227,6 +351,9 @@ static struct api_out graph_dump(const void *request, struct api_ctx *) {
227351
const char *name = info->node->name;
228352
const char *attrs = "";
229353

354+
if (node_id == port_output_node)
355+
nb_edges = info->node->nb_edges;
356+
230357
if (!errors) {
231358
if (nb_edges == 0)
232359
continue;
@@ -252,7 +379,10 @@ static struct api_out graph_dump(const void *request, struct api_ctx *) {
252379
continue;
253380
if ((edges = calloc(nb_edges, sizeof(char *))) == NULL)
254381
goto err;
255-
if (rte_node_edge_get(node_id, edges) == RTE_EDGE_ID_INVALID)
382+
if (node_id == port_output_node) {
383+
for (unsigned i = 0; i < nb_edges; i++)
384+
edges[i] = (char *)info->node->next_nodes[i];
385+
} else if (rte_node_edge_get(node_id, edges) == RTE_EDGE_ID_INVALID)
256386
goto err;
257387

258388
for (unsigned i = 0; i < nb_edges; i++) {
@@ -270,7 +400,7 @@ static struct api_out graph_dump(const void *request, struct api_ctx *) {
270400
continue;
271401

272402
rte_node_t id = rte_node_from_name(n->node->name);
273-
if (rte_node_edge_count(id) == 0) {
403+
if (id != port_output_node && rte_node_edge_count(id) == 0) {
274404
if (!errors)
275405
goto skip;
276406
node_attrs = " [color=darkorange]";
@@ -326,7 +456,16 @@ static void graph_init(struct event_base *) {
326456
reg->id = __rte_node_register(reg);
327457
if (reg->id == RTE_NODE_ID_INVALID)
328458
ABORT("__rte_node_register(%s): %s", reg->name, rte_strerror(rte_errno));
329-
gr_vec_add(node_names, reg->name);
459+
460+
if (strcmp(reg->name, RX_NODE_BASE) == 0)
461+
port_rx_node = reg->id;
462+
else if (strcmp(reg->name, TX_NODE_BASE) == 0)
463+
port_tx_node = reg->id;
464+
else
465+
gr_vec_add(base_node_names, reg->name);
466+
467+
if (strcmp(reg->name, "port_output") == 0)
468+
port_output_node = reg->id;
330469
}
331470

332471
// then, invoke all registration callbacks where applicable
@@ -346,8 +485,9 @@ static void graph_fini(struct event_base *) {
346485
}
347486
}
348487

349-
gr_vec_free(node_names);
350-
node_names = NULL;
488+
gr_vec_free(base_node_names);
489+
gr_strvec_free(rx_node_names);
490+
gr_strvec_free(tx_node_names);
351491
}
352492

353493
static struct gr_module graph_module = {

modules/infra/control/graph_priv.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@
33

44
#pragma once
55

6+
#include <gr_port.h>
7+
#include <gr_vec.h>
68
#include <gr_worker.h>
79

8-
int worker_graph_reload(struct worker *);
9-
int worker_graph_reload_all(void);
10+
#include <rte_graph.h>
11+
12+
int worker_graph_reload(struct worker *, gr_vec struct iface_info_port **);
13+
int worker_graph_reload_all(gr_vec struct iface_info_port **);
14+
void worker_graph_nodes_del_unused(gr_vec rte_node_t *);
1015
void worker_graph_free(struct worker *);

0 commit comments

Comments
 (0)