Skip to content

Commit fec5e65

Browse files
Tom Herbertdavem330
authored andcommitted
rfs: Receive Flow Steering
This patch implements receive flow steering (RFS). RFS steers received packets for layer 3 and 4 processing to the CPU where the application for the corresponding flow is running. RFS is an extension of Receive Packet Steering (RPS). The basic idea of RFS is that when an application calls recvmsg (or sendmsg) the application's running CPU is stored in a hash table that is indexed by the connection's rxhash which is stored in the socket structure. The rxhash is passed in skb's received on the connection from netif_receive_skb. For each received packet, the associated rxhash is used to look up the CPU in the hash table, if a valid CPU is set then the packet is steered to that CPU using the RPS mechanisms. The convolution of the simple approach is that it would potentially allow OOO packets. If threads are thrashing around CPUs or multiple threads are trying to read from the same sockets, a quickly changing CPU value in the hash table could cause rampant OOO packets-- we consider this a non-starter. To avoid OOO packets, this solution implements two types of hash tables: rps_sock_flow_table and rps_dev_flow_table. rps_sock_table is a global hash table. Each entry is just a CPU number and it is populated in recvmsg and sendmsg as described above. This table contains the "desired" CPUs for flows. rps_dev_flow_table is specific to each device queue. Each entry contains a CPU and a tail queue counter. The CPU is the "current" CPU for a matching flow. The tail queue counter holds the value of a tail queue counter for the associated CPU's backlog queue at the time of last enqueue for a flow matching the entry. Each backlog queue has a queue head counter which is incremented on dequeue, and so a queue tail counter is computed as queue head count + queue length. When a packet is enqueued on a backlog queue, the current value of the queue tail counter is saved in the hash entry of the rps_dev_flow_table. And now the trick: when selecting the CPU for RPS (get_rps_cpu) the rps_sock_flow table and the rps_dev_flow table for the RX queue are consulted. When the desired CPU for the flow (found in the rps_sock_flow table) does not match the current CPU (found in the rps_dev_flow table), the current CPU is changed to the desired CPU if one of the following is true: - The current CPU is unset (equal to RPS_NO_CPU) - Current CPU is offline - The current CPU's queue head counter >= queue tail counter in the rps_dev_flow table. This checks if the queue tail has advanced beyond the last packet that was enqueued using this table entry. This guarantees that all packets queued using this entry have been dequeued, thus preserving in order delivery. Making each queue have its own rps_dev_flow table has two advantages: 1) the tail queue counters will be written on each receive, so keeping the table local to interrupting CPU s good for locality. 2) this allows lockless access to the table-- the CPU number and queue tail counter need to be accessed together under mutual exclusion from netif_receive_skb, we assume that this is only called from device napi_poll which is non-reentrant. This patch implements RFS for TCP and connected UDP sockets. It should be usable for other flow oriented protocols. There are two configuration parameters for RFS. The "rps_flow_entries" kernel init parameter sets the number of entries in the rps_sock_flow_table, the per rxqueue sysfs entry "rps_flow_cnt" contains the number of entries in the rps_dev_flow table for the rxqueue. Both are rounded to power of two. The obvious benefit of RFS (over just RPS) is that it achieves CPU locality between the receive processing for a flow and the applications processing; this can result in increased performance (higher pps, lower latency). The benefits of RFS are dependent on cache hierarchy, application load, and other factors. On simple benchmarks, we don't necessarily see improvement and sometimes see degradation. However, for more complex benchmarks and for applications where cache pressure is much higher this technique seems to perform very well. Below are some benchmark results which show the potential benfit of this patch. The netperf test has 500 instances of netperf TCP_RR test with 1 byte req. and resp. The RPC test is an request/response test similar in structure to netperf RR test ith 100 threads on each host, but does more work in userspace that netperf. e1000e on 8 core Intel No RFS or RPS 104K tps at 30% CPU No RFS (best RPS config): 290K tps at 63% CPU RFS 303K tps at 61% CPU RPC test tps CPU% 50/90/99% usec latency Latency StdDev No RFS/RPS 103K 48% 757/900/3185 4472.35 RPS only: 174K 73% 415/993/2468 491.66 RFS 223K 73% 379/651/1382 315.61 Signed-off-by: Tom Herbert <[email protected]> Signed-off-by: Eric Dumazet <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent b5d4399 commit fec5e65

File tree

8 files changed

+389
-29
lines changed

8 files changed

+389
-29
lines changed

include/linux/netdevice.h

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,14 +530,73 @@ struct rps_map {
530530
};
531531
#define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + (_num * sizeof(u16)))
532532

533+
/*
534+
* The rps_dev_flow structure contains the mapping of a flow to a CPU and the
535+
* tail pointer for that CPU's input queue at the time of last enqueue.
536+
*/
537+
struct rps_dev_flow {
538+
u16 cpu;
539+
u16 fill;
540+
unsigned int last_qtail;
541+
};
542+
543+
/*
544+
* The rps_dev_flow_table structure contains a table of flow mappings.
545+
*/
546+
struct rps_dev_flow_table {
547+
unsigned int mask;
548+
struct rcu_head rcu;
549+
struct work_struct free_work;
550+
struct rps_dev_flow flows[0];
551+
};
552+
#define RPS_DEV_FLOW_TABLE_SIZE(_num) (sizeof(struct rps_dev_flow_table) + \
553+
(_num * sizeof(struct rps_dev_flow)))
554+
555+
/*
556+
* The rps_sock_flow_table contains mappings of flows to the last CPU
557+
* on which they were processed by the application (set in recvmsg).
558+
*/
559+
struct rps_sock_flow_table {
560+
unsigned int mask;
561+
u16 ents[0];
562+
};
563+
#define RPS_SOCK_FLOW_TABLE_SIZE(_num) (sizeof(struct rps_sock_flow_table) + \
564+
(_num * sizeof(u16)))
565+
566+
#define RPS_NO_CPU 0xffff
567+
568+
static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
569+
u32 hash)
570+
{
571+
if (table && hash) {
572+
unsigned int cpu, index = hash & table->mask;
573+
574+
/* We only give a hint, preemption can change cpu under us */
575+
cpu = raw_smp_processor_id();
576+
577+
if (table->ents[index] != cpu)
578+
table->ents[index] = cpu;
579+
}
580+
}
581+
582+
static inline void rps_reset_sock_flow(struct rps_sock_flow_table *table,
583+
u32 hash)
584+
{
585+
if (table && hash)
586+
table->ents[hash & table->mask] = RPS_NO_CPU;
587+
}
588+
589+
extern struct rps_sock_flow_table *rps_sock_flow_table;
590+
533591
/* This structure contains an instance of an RX queue. */
534592
struct netdev_rx_queue {
535593
struct rps_map *rps_map;
594+
struct rps_dev_flow_table *rps_flow_table;
536595
struct kobject kobj;
537596
struct netdev_rx_queue *first;
538597
atomic_t count;
539598
} ____cacheline_aligned_in_smp;
540-
#endif
599+
#endif /* CONFIG_RPS */
541600

542601
/*
543602
* This structure defines the management hooks for network devices.
@@ -1333,11 +1392,19 @@ struct softnet_data {
13331392
/* Elements below can be accessed between CPUs for RPS */
13341393
#ifdef CONFIG_RPS
13351394
struct call_single_data csd ____cacheline_aligned_in_smp;
1395+
unsigned int input_queue_head;
13361396
#endif
13371397
struct sk_buff_head input_pkt_queue;
13381398
struct napi_struct backlog;
13391399
};
13401400

1401+
static inline void incr_input_queue_head(struct softnet_data *queue)
1402+
{
1403+
#ifdef CONFIG_RPS
1404+
queue->input_queue_head++;
1405+
#endif
1406+
}
1407+
13411408
DECLARE_PER_CPU_ALIGNED(struct softnet_data, softnet_data);
13421409

13431410
#define HAVE_NETIF_QUEUE

include/net/inet_sock.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <linux/string.h>
2222
#include <linux/types.h>
2323
#include <linux/jhash.h>
24+
#include <linux/netdevice.h>
2425

2526
#include <net/flow.h>
2627
#include <net/sock.h>
@@ -101,6 +102,7 @@ struct rtable;
101102
* @uc_ttl - Unicast TTL
102103
* @inet_sport - Source port
103104
* @inet_id - ID counter for DF pkts
105+
* @rxhash - flow hash received from netif layer
104106
* @tos - TOS
105107
* @mc_ttl - Multicasting TTL
106108
* @is_icsk - is this an inet_connection_sock?
@@ -124,6 +126,9 @@ struct inet_sock {
124126
__u16 cmsg_flags;
125127
__be16 inet_sport;
126128
__u16 inet_id;
129+
#ifdef CONFIG_RPS
130+
__u32 rxhash;
131+
#endif
127132

128133
struct ip_options *opt;
129134
__u8 tos;
@@ -219,4 +224,37 @@ static inline __u8 inet_sk_flowi_flags(const struct sock *sk)
219224
return inet_sk(sk)->transparent ? FLOWI_FLAG_ANYSRC : 0;
220225
}
221226

227+
static inline void inet_rps_record_flow(const struct sock *sk)
228+
{
229+
#ifdef CONFIG_RPS
230+
struct rps_sock_flow_table *sock_flow_table;
231+
232+
rcu_read_lock();
233+
sock_flow_table = rcu_dereference(rps_sock_flow_table);
234+
rps_record_sock_flow(sock_flow_table, inet_sk(sk)->rxhash);
235+
rcu_read_unlock();
236+
#endif
237+
}
238+
239+
static inline void inet_rps_reset_flow(const struct sock *sk)
240+
{
241+
#ifdef CONFIG_RPS
242+
struct rps_sock_flow_table *sock_flow_table;
243+
244+
rcu_read_lock();
245+
sock_flow_table = rcu_dereference(rps_sock_flow_table);
246+
rps_reset_sock_flow(sock_flow_table, inet_sk(sk)->rxhash);
247+
rcu_read_unlock();
248+
#endif
249+
}
250+
251+
static inline void inet_rps_save_rxhash(const struct sock *sk, u32 rxhash)
252+
{
253+
#ifdef CONFIG_RPS
254+
if (unlikely(inet_sk(sk)->rxhash != rxhash)) {
255+
inet_rps_reset_flow(sk);
256+
inet_sk(sk)->rxhash = rxhash;
257+
}
258+
#endif
259+
}
222260
#endif /* _INET_SOCK_H */

net/core/dev.c

Lines changed: 91 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2203,19 +2203,28 @@ int weight_p __read_mostly = 64; /* old backlog weight */
22032203
DEFINE_PER_CPU(struct netif_rx_stats, netdev_rx_stat) = { 0, };
22042204

22052205
#ifdef CONFIG_RPS
2206+
2207+
/* One global table that all flow-based protocols share. */
2208+
struct rps_sock_flow_table *rps_sock_flow_table;
2209+
EXPORT_SYMBOL(rps_sock_flow_table);
2210+
22062211
/*
22072212
* get_rps_cpu is called from netif_receive_skb and returns the target
22082213
* CPU from the RPS map of the receiving queue for a given skb.
22092214
* rcu_read_lock must be held on entry.
22102215
*/
2211-
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
2216+
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
2217+
struct rps_dev_flow **rflowp)
22122218
{
22132219
struct ipv6hdr *ip6;
22142220
struct iphdr *ip;
22152221
struct netdev_rx_queue *rxqueue;
22162222
struct rps_map *map;
2223+
struct rps_dev_flow_table *flow_table;
2224+
struct rps_sock_flow_table *sock_flow_table;
22172225
int cpu = -1;
22182226
u8 ip_proto;
2227+
u16 tcpu;
22192228
u32 addr1, addr2, ports, ihl;
22202229

22212230
if (skb_rx_queue_recorded(skb)) {
@@ -2232,7 +2241,7 @@ static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
22322241
} else
22332242
rxqueue = dev->_rx;
22342243

2235-
if (!rxqueue->rps_map)
2244+
if (!rxqueue->rps_map && !rxqueue->rps_flow_table)
22362245
goto done;
22372246

22382247
if (skb->rxhash)
@@ -2284,9 +2293,48 @@ static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
22842293
skb->rxhash = 1;
22852294

22862295
got_hash:
2296+
flow_table = rcu_dereference(rxqueue->rps_flow_table);
2297+
sock_flow_table = rcu_dereference(rps_sock_flow_table);
2298+
if (flow_table && sock_flow_table) {
2299+
u16 next_cpu;
2300+
struct rps_dev_flow *rflow;
2301+
2302+
rflow = &flow_table->flows[skb->rxhash & flow_table->mask];
2303+
tcpu = rflow->cpu;
2304+
2305+
next_cpu = sock_flow_table->ents[skb->rxhash &
2306+
sock_flow_table->mask];
2307+
2308+
/*
2309+
* If the desired CPU (where last recvmsg was done) is
2310+
* different from current CPU (one in the rx-queue flow
2311+
* table entry), switch if one of the following holds:
2312+
* - Current CPU is unset (equal to RPS_NO_CPU).
2313+
* - Current CPU is offline.
2314+
* - The current CPU's queue tail has advanced beyond the
2315+
* last packet that was enqueued using this table entry.
2316+
* This guarantees that all previous packets for the flow
2317+
* have been dequeued, thus preserving in order delivery.
2318+
*/
2319+
if (unlikely(tcpu != next_cpu) &&
2320+
(tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
2321+
((int)(per_cpu(softnet_data, tcpu).input_queue_head -
2322+
rflow->last_qtail)) >= 0)) {
2323+
tcpu = rflow->cpu = next_cpu;
2324+
if (tcpu != RPS_NO_CPU)
2325+
rflow->last_qtail = per_cpu(softnet_data,
2326+
tcpu).input_queue_head;
2327+
}
2328+
if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
2329+
*rflowp = rflow;
2330+
cpu = tcpu;
2331+
goto done;
2332+
}
2333+
}
2334+
22872335
map = rcu_dereference(rxqueue->rps_map);
22882336
if (map) {
2289-
u16 tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];
2337+
tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];
22902338

22912339
if (cpu_online(tcpu)) {
22922340
cpu = tcpu;
@@ -2320,13 +2368,14 @@ static void trigger_softirq(void *data)
23202368
__napi_schedule(&queue->backlog);
23212369
__get_cpu_var(netdev_rx_stat).received_rps++;
23222370
}
2323-
#endif /* CONFIG_SMP */
2371+
#endif /* CONFIG_RPS */
23242372

23252373
/*
23262374
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
23272375
* queue (may be a remote CPU queue).
23282376
*/
2329-
static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
2377+
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
2378+
unsigned int *qtail)
23302379
{
23312380
struct softnet_data *queue;
23322381
unsigned long flags;
@@ -2341,6 +2390,10 @@ static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
23412390
if (queue->input_pkt_queue.qlen) {
23422391
enqueue:
23432392
__skb_queue_tail(&queue->input_pkt_queue, skb);
2393+
#ifdef CONFIG_RPS
2394+
*qtail = queue->input_queue_head +
2395+
queue->input_pkt_queue.qlen;
2396+
#endif
23442397
rps_unlock(queue);
23452398
local_irq_restore(flags);
23462399
return NET_RX_SUCCESS;
@@ -2355,11 +2408,10 @@ static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
23552408

23562409
cpu_set(cpu, rcpus->mask[rcpus->select]);
23572410
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
2358-
} else
2359-
__napi_schedule(&queue->backlog);
2360-
#else
2361-
__napi_schedule(&queue->backlog);
2411+
goto enqueue;
2412+
}
23622413
#endif
2414+
__napi_schedule(&queue->backlog);
23632415
}
23642416
goto enqueue;
23652417
}
@@ -2401,18 +2453,25 @@ int netif_rx(struct sk_buff *skb)
24012453

24022454
#ifdef CONFIG_RPS
24032455
{
2456+
struct rps_dev_flow voidflow, *rflow = &voidflow;
24042457
int cpu;
24052458

24062459
rcu_read_lock();
2407-
cpu = get_rps_cpu(skb->dev, skb);
2460+
2461+
cpu = get_rps_cpu(skb->dev, skb, &rflow);
24082462
if (cpu < 0)
24092463
cpu = smp_processor_id();
2410-
ret = enqueue_to_backlog(skb, cpu);
2464+
2465+
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
2466+
24112467
rcu_read_unlock();
24122468
}
24132469
#else
2414-
ret = enqueue_to_backlog(skb, get_cpu());
2415-
put_cpu();
2470+
{
2471+
unsigned int qtail;
2472+
ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
2473+
put_cpu();
2474+
}
24162475
#endif
24172476
return ret;
24182477
}
@@ -2830,14 +2889,22 @@ static int __netif_receive_skb(struct sk_buff *skb)
28302889
int netif_receive_skb(struct sk_buff *skb)
28312890
{
28322891
#ifdef CONFIG_RPS
2833-
int cpu;
2892+
struct rps_dev_flow voidflow, *rflow = &voidflow;
2893+
int cpu, ret;
2894+
2895+
rcu_read_lock();
28342896

2835-
cpu = get_rps_cpu(skb->dev, skb);
2897+
cpu = get_rps_cpu(skb->dev, skb, &rflow);
28362898

2837-
if (cpu < 0)
2838-
return __netif_receive_skb(skb);
2839-
else
2840-
return enqueue_to_backlog(skb, cpu);
2899+
if (cpu >= 0) {
2900+
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
2901+
rcu_read_unlock();
2902+
} else {
2903+
rcu_read_unlock();
2904+
ret = __netif_receive_skb(skb);
2905+
}
2906+
2907+
return ret;
28412908
#else
28422909
return __netif_receive_skb(skb);
28432910
#endif
@@ -2856,6 +2923,7 @@ static void flush_backlog(void *arg)
28562923
if (skb->dev == dev) {
28572924
__skb_unlink(skb, &queue->input_pkt_queue);
28582925
kfree_skb(skb);
2926+
incr_input_queue_head(queue);
28592927
}
28602928
rps_unlock(queue);
28612929
}
@@ -3179,6 +3247,7 @@ static int process_backlog(struct napi_struct *napi, int quota)
31793247
local_irq_enable();
31803248
break;
31813249
}
3250+
incr_input_queue_head(queue);
31823251
rps_unlock(queue);
31833252
local_irq_enable();
31843253

@@ -5542,8 +5611,10 @@ static int dev_cpu_callback(struct notifier_block *nfb,
55425611
local_irq_enable();
55435612

55445613
/* Process offline CPU's input_pkt_queue */
5545-
while ((skb = __skb_dequeue(&oldsd->input_pkt_queue)))
5614+
while ((skb = __skb_dequeue(&oldsd->input_pkt_queue))) {
55465615
netif_rx(skb);
5616+
incr_input_queue_head(oldsd);
5617+
}
55475618

55485619
return NOTIFY_OK;
55495620
}

0 commit comments

Comments
 (0)