Skip to content

Commit 5342d01

Browse files
committed
datapath/tables: Add Table[NodeAddress]
To provide modules access to the evolving set of local node's addresses, add Table[NodeAddress] that derives from the low-level Table[*Device] and applies the Cilium-specific heuristics to pick which addresses are considered host IPs and which are used for NodePort and BPF masquerading. To allow user to expand the set of addresses used for NodePort, add the configuration flag "--nodeport-addresses" for specifying from which CIDRs the NodePort addresses are allowed. This mirrors exactly the same kube-proxy flag. If user does not specify this, then the default behavior remains, which is to pick the first IPv4 and IPv6 address of each native device. Signed-off-by: Jussi Maki <[email protected]>
1 parent 0164d11 commit 5342d01

File tree

7 files changed

+853
-1
lines changed

7 files changed

+853
-1
lines changed

Documentation/cmdref/cilium-agent.md

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Documentation/cmdref/cilium-agent_hive.md

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Documentation/cmdref/cilium-agent_hive_dot-graph.md

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/datapath/tables/cells.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ var Cell = cell.Module(
1212
"Datapath state tables",
1313

1414
L2AnnounceTableCell,
15+
NodeAddressCell,
1516
)
Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package tables
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"net"
10+
"net/netip"
11+
"slices"
12+
"sort"
13+
"strings"
14+
15+
"github.com/sirupsen/logrus"
16+
"github.com/spf13/pflag"
17+
"k8s.io/apimachinery/pkg/util/sets"
18+
19+
"github.com/cilium/cilium/pkg/cidr"
20+
"github.com/cilium/cilium/pkg/defaults"
21+
"github.com/cilium/cilium/pkg/hive"
22+
"github.com/cilium/cilium/pkg/hive/cell"
23+
"github.com/cilium/cilium/pkg/hive/job"
24+
"github.com/cilium/cilium/pkg/ip"
25+
"github.com/cilium/cilium/pkg/option"
26+
"github.com/cilium/cilium/pkg/rate"
27+
"github.com/cilium/cilium/pkg/statedb"
28+
"github.com/cilium/cilium/pkg/statedb/index"
29+
"github.com/cilium/cilium/pkg/time"
30+
)
31+
32+
// NodeAddress is an IP address assigned to a network interface on a Cilium node
33+
// that is considered a "host" IP address.
34+
type NodeAddress struct {
35+
Addr netip.Addr
36+
37+
// NodePort is true if this address is to be used for NodePort.
38+
// If --nodeport-addresses is set, then all addresses on native
39+
// devices that are contained within the specified CIDRs are chosen.
40+
// If it is not set, then only the primary IPv4 and/or IPv6 address
41+
// of each native device is used.
42+
NodePort bool
43+
44+
// Primary is true if this is the primary IPv4 or IPv6 address of this device.
45+
// This is mainly used to pick the address for BPF masquerading.
46+
Primary bool
47+
48+
// DeviceName is the name of the network device from which this address
49+
// is derived from.
50+
DeviceName string
51+
}
52+
53+
func (n *NodeAddress) IP() net.IP {
54+
return n.Addr.AsSlice()
55+
}
56+
57+
func (n *NodeAddress) String() string {
58+
return fmt.Sprintf("%s (%s)", n.Addr, n.DeviceName)
59+
}
60+
61+
type NodeAddressConfig struct {
62+
NodePortAddresses []*cidr.CIDR `mapstructure:"nodeport-addresses"`
63+
}
64+
65+
var (
66+
// NodeAddressIndex is the primary index for node addresses:
67+
//
68+
// var nodeAddresses Table[NodeAddress]
69+
// nodeAddresses.First(txn, NodeAddressIndex.Query(netip.MustParseAddr("1.2.3.4")))
70+
NodeAddressIndex = statedb.Index[NodeAddress, netip.Addr]{
71+
Name: "id",
72+
FromObject: func(a NodeAddress) index.KeySet {
73+
return index.NewKeySet(index.NetIPAddr(a.Addr))
74+
},
75+
FromKey: func(addr netip.Addr) []byte {
76+
return index.NetIPAddr(addr)
77+
},
78+
Unique: true,
79+
}
80+
81+
NodeAddressDeviceNameIndex = statedb.Index[NodeAddress, string]{
82+
Name: "name",
83+
FromObject: func(a NodeAddress) index.KeySet {
84+
return index.NewKeySet(index.String(a.DeviceName))
85+
},
86+
FromKey: index.String,
87+
Unique: false,
88+
}
89+
90+
NodeAddressTableName statedb.TableName = "node-addresses"
91+
92+
// NodeAddressCell provides Table[NodeAddress] and a background controller
93+
// that derives the node addresses from the low-level Table[*Device].
94+
//
95+
// The Table[NodeAddress] contains the actual assigned addresses on the node,
96+
// but not for example external Kubernetes node addresses that may be merely
97+
// NATd to a private address. Those can be queried through [node.LocalNodeStore].
98+
NodeAddressCell = cell.Module(
99+
"node-address",
100+
"Table of node addresses derived from system network devices",
101+
102+
statedb.NewPrivateRWTableCell[NodeAddress](NodeAddressTableName, NodeAddressIndex, NodeAddressDeviceNameIndex),
103+
cell.Provide(
104+
newNodeAddressTable,
105+
newAddressScopeMax,
106+
),
107+
cell.Config(NodeAddressConfig{}),
108+
)
109+
110+
// NodeAddressTestTableCell provides Table[NodeAddress] and RWTable[NodeAddress]
111+
// for use in tests of modules that depend on node addresses.
112+
NodeAddressTestTableCell = statedb.NewTableCell[NodeAddress](
113+
NodeAddressTableName,
114+
NodeAddressIndex,
115+
)
116+
)
117+
118+
const (
119+
nodeAddressControllerMinInterval = 100 * time.Millisecond
120+
)
121+
122+
// AddressScopeMax sets the maximum scope an IP address can have. A scope
123+
// is defined in rtnetlink(7) as the distance to the destination where a
124+
// lower number signifies a wider scope with RT_SCOPE_UNIVERSE (0) being
125+
// the widest. Definitions in Go are in unix package, e.g.
126+
// unix.RT_SCOPE_UNIVERSE and so on.
127+
//
128+
// This defaults to RT_SCOPE_LINK-1 (defaults.AddressScopeMax) and can be
129+
// set by the user with --local-max-addr-scope.
130+
type AddressScopeMax uint8
131+
132+
func newAddressScopeMax(cfg NodeAddressConfig, daemonCfg *option.DaemonConfig) (AddressScopeMax, error) {
133+
return AddressScopeMax(daemonCfg.AddressScopeMax), nil
134+
}
135+
136+
func (cfg NodeAddressConfig) getNets() []*net.IPNet {
137+
nets := make([]*net.IPNet, len(cfg.NodePortAddresses))
138+
for i, cidr := range cfg.NodePortAddresses {
139+
nets[i] = cidr.IPNet
140+
}
141+
return nets
142+
}
143+
144+
func (NodeAddressConfig) Flags(flags *pflag.FlagSet) {
145+
flags.StringSlice(
146+
"nodeport-addresses",
147+
nil,
148+
"A whitelist of CIDRs to limit which IPs are used for NodePort. If not set, primary IPv4 and/or IPv6 address of each native device is used.")
149+
}
150+
151+
type nodeAddressControllerParams struct {
152+
cell.In
153+
154+
HealthScope cell.Scope
155+
Log logrus.FieldLogger
156+
Config NodeAddressConfig
157+
Lifecycle hive.Lifecycle
158+
Jobs job.Registry
159+
DB *statedb.DB
160+
Devices statedb.Table[*Device]
161+
NodeAddresses statedb.RWTable[NodeAddress]
162+
AddressScopeMax AddressScopeMax
163+
}
164+
165+
type nodeAddressController struct {
166+
nodeAddressControllerParams
167+
168+
tracker *statedb.DeleteTracker[*Device]
169+
}
170+
171+
// newNodeAddressTable constructs the node address controller & registers its
172+
// lifecycle hooks and then provides Table[NodeAddress] to the application.
173+
// This enforces proper ordering, e.g. controller is started before anything
174+
// that depends on Table[NodeAddress] and allows it to populate it before
175+
// it is accessed.
176+
func newNodeAddressTable(p nodeAddressControllerParams) (tbl statedb.Table[NodeAddress], err error) {
177+
n := nodeAddressController{nodeAddressControllerParams: p}
178+
n.register()
179+
return n.NodeAddresses, nil
180+
}
181+
182+
func (n *nodeAddressController) register() {
183+
g := n.Jobs.NewGroup(n.HealthScope)
184+
g.Add(job.OneShot("node-address-update", n.run))
185+
186+
n.Lifecycle.Append(
187+
hive.Hook{
188+
OnStart: func(ctx hive.HookContext) error {
189+
txn := n.DB.WriteTxn(n.NodeAddresses, n.Devices /* for delete tracker */)
190+
defer txn.Abort()
191+
192+
// Start tracking deletions of devices.
193+
var err error
194+
n.tracker, err = n.Devices.DeleteTracker(txn, "node-addresses")
195+
if err != nil {
196+
return fmt.Errorf("DeleteTracker: %w", err)
197+
}
198+
199+
// Do an immediate update to populate the table before it is read from.
200+
devices, _ := n.Devices.All(txn)
201+
for dev, _, ok := devices.Next(); ok; dev, _, ok = devices.Next() {
202+
n.update(txn, nil, n.getAddressesFromDevice(dev), nil)
203+
}
204+
txn.Commit()
205+
206+
// Start the job in the background to incremental refresh
207+
// the node addresses.
208+
return g.Start(ctx)
209+
},
210+
OnStop: g.Stop,
211+
})
212+
213+
}
214+
215+
func (n *nodeAddressController) run(ctx context.Context, reporter cell.HealthReporter) error {
216+
defer n.tracker.Close()
217+
218+
limiter := rate.NewLimiter(nodeAddressControllerMinInterval, 1)
219+
revision := statedb.Revision(0)
220+
for {
221+
txn := n.DB.WriteTxn(n.NodeAddresses)
222+
process := func(dev *Device, deleted bool, rev statedb.Revision) error {
223+
addrIter, _ := n.NodeAddresses.Get(txn, NodeAddressDeviceNameIndex.Query(dev.Name))
224+
existing := statedb.CollectSet[NodeAddress](addrIter)
225+
var new sets.Set[NodeAddress]
226+
if !deleted {
227+
new = n.getAddressesFromDevice(dev)
228+
}
229+
n.update(txn, existing, new, reporter)
230+
return nil
231+
}
232+
var watch <-chan struct{}
233+
revision, watch, _ = n.tracker.Process(txn, revision, process)
234+
txn.Commit()
235+
236+
select {
237+
case <-ctx.Done():
238+
return nil
239+
case <-watch:
240+
}
241+
if err := limiter.Wait(ctx); err != nil {
242+
return err
243+
}
244+
}
245+
}
246+
247+
func (n *nodeAddressController) update(txn statedb.WriteTxn, existing, new sets.Set[NodeAddress], reporter cell.HealthReporter) {
248+
updated := false
249+
250+
// Insert new addresses that did not exist.
251+
for addr := range new {
252+
if !existing.Has(addr) {
253+
updated = true
254+
n.NodeAddresses.Insert(txn, addr)
255+
}
256+
}
257+
258+
// Remove addresses that were not part of the new set.
259+
for addr := range existing {
260+
if !new.Has(addr) {
261+
updated = true
262+
n.NodeAddresses.Delete(txn, addr)
263+
}
264+
}
265+
266+
if updated {
267+
addrs := showAddresses(new)
268+
n.Log.WithField("node-addresses", addrs).Info("Node addresses updated")
269+
if reporter != nil {
270+
reporter.OK(addrs)
271+
}
272+
}
273+
}
274+
275+
func (n *nodeAddressController) getAddressesFromDevice(dev *Device) (addrs sets.Set[NodeAddress]) {
276+
addrs = sets.New[NodeAddress]()
277+
278+
if dev.Flags&net.FlagUp == 0 {
279+
return
280+
}
281+
282+
// Skip obviously uninteresting devices.
283+
// We include the HostDevice as its IP addresses are consider node addresses
284+
// and added to e.g. ipcache as HOST_IDs.
285+
if dev.Name != defaults.HostDevice {
286+
for _, prefix := range defaults.ExcludedDevicePrefixes {
287+
if strings.HasPrefix(dev.Name, prefix) {
288+
return
289+
}
290+
}
291+
}
292+
293+
// ipv4Found and ipv6Found are set to true when the primary address is picked.
294+
// Used to implement 'NodePort' and 'Primary' flags.
295+
ipv4Found, ipv6Found := false, false
296+
297+
for _, addr := range sortedAddresses(dev.Addrs) {
298+
// We keep the scope-based address filtering as was introduced
299+
// in 080857bdedca67d58ec39f8f96c5f38b22f6dc0b.
300+
if addr.Scope > uint8(n.AddressScopeMax) || addr.Addr.IsLoopback() {
301+
continue
302+
}
303+
304+
// Figure out if the address is usable for NodePort.
305+
nodePort := false
306+
primary := false
307+
if dev.Selected && len(n.Config.NodePortAddresses) == 0 {
308+
// The user has not specified IP ranges to filter on IPs on which to serve NodePort.
309+
// Thus the default behavior is to use the primary IPv4 and IPv6 addresses of each
310+
// device.
311+
if addr.Addr.Is4() && !ipv4Found {
312+
ipv4Found = true
313+
nodePort = true
314+
primary = true
315+
}
316+
if addr.Addr.Is6() && !ipv6Found {
317+
ipv6Found = true
318+
nodePort = true
319+
primary = true
320+
}
321+
} else if ip.NetsContainsAny(n.Config.getNets(), []*net.IPNet{ip.IPToPrefix(addr.AsIP())}) {
322+
// User specified --nodeport-addresses and this address was within the range.
323+
nodePort = true
324+
if addr.Addr.Is4() && !ipv4Found {
325+
primary = true
326+
ipv4Found = true
327+
} else if addr.Addr.Is6() && !ipv6Found {
328+
primary = true
329+
ipv6Found = true
330+
}
331+
}
332+
333+
addrs.Insert(NodeAddress{
334+
Addr: addr.Addr,
335+
Primary: primary,
336+
NodePort: nodePort,
337+
DeviceName: dev.Name,
338+
})
339+
}
340+
return
341+
}
342+
343+
// showAddresses formats a Set[NodeAddress] as "1.2.3.4 (eth0), fe80::1 (eth1)"
344+
func showAddresses(addrs sets.Set[NodeAddress]) string {
345+
ss := make([]string, 0, len(addrs))
346+
for addr := range addrs {
347+
ss = append(ss, addr.String())
348+
}
349+
sort.Strings(ss)
350+
return strings.Join(ss, ", ")
351+
}
352+
353+
// sortedAddresses returns a copy of the addresses, sorted by primary (e.g. !iIFA_F_SECONDARY) and then by
354+
// address scope.
355+
func sortedAddresses(addrs []DeviceAddress) []DeviceAddress {
356+
addrs = slices.Clone(addrs)
357+
358+
sort.SliceStable(addrs, func(i, j int) bool {
359+
switch {
360+
case !addrs[i].Secondary && addrs[j].Secondary:
361+
return true
362+
case addrs[i].Secondary && !addrs[j].Secondary:
363+
return false
364+
default:
365+
return addrs[i].Scope < addrs[j].Scope
366+
}
367+
})
368+
return addrs
369+
}

0 commit comments

Comments
 (0)