From 216b61a74e0e7533c46ae00b5d7cb46039d25029 Mon Sep 17 00:00:00 2001 From: David Naylor Date: Tue, 30 Jan 2018 17:40:31 -0800 Subject: [PATCH 1/2] dbus: Add unit path unescape function --- dbus/dbus.go | 22 ++++++++++++++++++++++ dbus/dbus_test.go | 19 +++++++++++++++++++ dbus/methods.go | 5 +++++ dbus/methods_test.go | 17 +++++++++++++++++ 4 files changed, 63 insertions(+) diff --git a/dbus/dbus.go b/dbus/dbus.go index c1694fb5..49d2a1b2 100644 --- a/dbus/dbus.go +++ b/dbus/dbus.go @@ -16,6 +16,7 @@ package dbus import ( + "encoding/hex" "fmt" "os" "strconv" @@ -60,6 +61,27 @@ func PathBusEscape(path string) string { return string(n) } +// pathBusUnescape is the inverse of PathBusEscape. +func pathBusUnescape(path string) string { + if path == "_" { + return "" + } + n := []byte{} + for i := 0; i < len(path); i++ { + c := path[i] + if c == '_' && i+2 < len(path) { + res, err := hex.DecodeString(path[i+1 : i+3]) + if err == nil { + n = append(n, res...) + } + i += 2 + } else { + n = append(n, c) + } + } + return string(n) +} + // Conn is a connection to systemd's dbus endpoint. type Conn struct { // sysconn/sysobj are only used to call dbus methods diff --git a/dbus/dbus_test.go b/dbus/dbus_test.go index 3ea131e2..81c832a8 100644 --- a/dbus/dbus_test.go +++ b/dbus/dbus_test.go @@ -67,6 +67,25 @@ func TestPathBusEscape(t *testing.T) { } +func TestPathBusUnescape(t *testing.T) { + for in, want := range map[string]string{ + "_": "", + "foo_2eservice": "foo.service", + "foobar": "foobar", + "woof_40woof_2eservice": "woof@woof.service", + "_30123456": "0123456", + "account_5fdb_2eservice": "account_db.service", + "got_2ddashes": "got-dashes", + "foobar_": "foobar_", + "foobar_2": "foobar_2", + } { + got := pathBusUnescape(in) + if got != want { + t.Errorf("bad result for pathBusUnescape(%s): got %q, want %q", in, got, want) + } + } +} + // TestNew ensures that New() works without errors. func TestNew(t *testing.T) { _, err := New() diff --git a/dbus/methods.go b/dbus/methods.go index 7271cd8e..4511d0a9 100644 --- a/dbus/methods.go +++ b/dbus/methods.go @@ -584,3 +584,8 @@ func (c *Conn) Reload() error { func unitPath(name string) dbus.ObjectPath { return dbus.ObjectPath("/org/freedesktop/systemd1/unit/" + PathBusEscape(name)) } + +// unitName returns the unescaped base element of the supplied escaped path +func unitName(dpath dbus.ObjectPath) string { + return pathBusUnescape(path.Base(string(dpath))) +} diff --git a/dbus/methods_test.go b/dbus/methods_test.go index 6ee56b50..2e5e5dd9 100644 --- a/dbus/methods_test.go +++ b/dbus/methods_test.go @@ -1524,3 +1524,20 @@ func TestReload(t *testing.T) { t.Fatal(err) } } + +func TestUnitName(t *testing.T) { + for _, unit := range []string{ + "", + "foo.service", + "foobar", + "woof@woof.service", + "0123456", + "account_db.service", + "got-dashes", + } { + got := unitName(unitPath(unit)) + if got != unit { + t.Errorf("bad result for unitName(%s): got %q, want %q", unit, got, unit) + } + } +} From 65792598ea824aa726a913d05a32b669fdb21fa2 Mon Sep 17 00:00:00 2001 From: David Naylor Date: Tue, 30 Jan 2018 17:42:08 -0800 Subject: [PATCH 2/2] dbus: Add SetPropertiesSubscriber method This commit adds a SetPropertiesSubscriber method which is similar to SetSubStateSubscriber but with two important differences: * Each update includes all changed properties, not just SubState * Each set of property values from systemd is reported; no update can be missed. With SubStateSubscriber, transient states can be missed because sendSubStateUpdate calls GetUnitPathProperties after receiving the original signal from systemd; by this point, the unit's state may have changed again. (This behavior is clearly documented, but for some use cases it might not be acceptable to miss a state transition.) --- dbus/dbus.go | 9 ++++- dbus/subscription.go | 85 +++++++++++++++++++++++++++++++-------- dbus/subscription_test.go | 56 ++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 18 deletions(-) diff --git a/dbus/dbus.go b/dbus/dbus.go index 49d2a1b2..1d54810a 100644 --- a/dbus/dbus.go +++ b/dbus/dbus.go @@ -96,13 +96,18 @@ type Conn struct { jobs map[dbus.ObjectPath]chan<- string sync.Mutex } - subscriber struct { + subStateSubscriber struct { updateCh chan<- *SubStateUpdate errCh chan<- error sync.Mutex ignore map[dbus.ObjectPath]int64 cleanIgnore int64 } + propertiesSubscriber struct { + updateCh chan<- *PropertiesUpdate + errCh chan<- error + sync.Mutex + } } // New establishes a connection to any available bus and authenticates. @@ -174,7 +179,7 @@ func NewConnection(dialBus func() (*dbus.Conn, error)) (*Conn, error) { sigobj: systemdObject(sigconn), } - c.subscriber.ignore = make(map[dbus.ObjectPath]int64) + c.subStateSubscriber.ignore = make(map[dbus.ObjectPath]int64) c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string) // Setup the listeners on jobs so that we can get completions diff --git a/dbus/subscription.go b/dbus/subscription.go index c647d008..077d475d 100644 --- a/dbus/subscription.go +++ b/dbus/subscription.go @@ -70,7 +70,8 @@ func (c *Conn) dispatch() { c.jobComplete(signal) } - if c.subscriber.updateCh == nil { + if c.subStateSubscriber.updateCh == nil && + c.propertiesSubscriber.updateCh == nil { continue } @@ -84,6 +85,12 @@ func (c *Conn) dispatch() { case "org.freedesktop.DBus.Properties.PropertiesChanged": if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" { unitPath = signal.Path + + if len(signal.Body) >= 2 { + if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok { + c.sendPropertiesUpdate(unitPath, changed) + } + } } } @@ -169,15 +176,19 @@ type SubStateUpdate struct { // is full, it attempts to write an error to errCh; if errCh is full, the error // passes silently. func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) { - c.subscriber.Lock() - defer c.subscriber.Unlock() - c.subscriber.updateCh = updateCh - c.subscriber.errCh = errCh + c.subStateSubscriber.Lock() + defer c.subStateSubscriber.Unlock() + c.subStateSubscriber.updateCh = updateCh + c.subStateSubscriber.errCh = errCh } func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { - c.subscriber.Lock() - defer c.subscriber.Unlock() + c.subStateSubscriber.Lock() + defer c.subStateSubscriber.Unlock() + + if c.subStateSubscriber.updateCh == nil { + return + } if c.shouldIgnore(unitPath) { return @@ -186,7 +197,7 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { info, err := c.GetUnitPathProperties(unitPath) if err != nil { select { - case c.subscriber.errCh <- err: + case c.subStateSubscriber.errCh <- err: default: } } @@ -196,10 +207,10 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { update := &SubStateUpdate{name, substate} select { - case c.subscriber.updateCh <- update: + case c.subStateSubscriber.updateCh <- update: default: select { - case c.subscriber.errCh <- errors.New("update channel full!"): + case c.subStateSubscriber.errCh <- errors.New("update channel full!"): default: } } @@ -222,7 +233,7 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { // the properties). func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool { - t, ok := c.subscriber.ignore[path] + t, ok := c.subStateSubscriber.ignore[path] return ok && t >= time.Now().UnixNano() } @@ -231,20 +242,62 @@ func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) { // unit is unloaded - it will trigger bad systemd dbus behavior if info["LoadState"].(string) == "not-found" { - c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval + c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval } } // without this, ignore would grow unboundedly over time func (c *Conn) cleanIgnore() { now := time.Now().UnixNano() - if c.subscriber.cleanIgnore < now { - c.subscriber.cleanIgnore = now + cleanIgnoreInterval + if c.subStateSubscriber.cleanIgnore < now { + c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval - for p, t := range c.subscriber.ignore { + for p, t := range c.subStateSubscriber.ignore { if t < now { - delete(c.subscriber.ignore, p) + delete(c.subStateSubscriber.ignore, p) } } } } + +// PropertiesUpdate holds a map of a unit's changed properties +type PropertiesUpdate struct { + UnitName string + Changed map[string]dbus.Variant +} + +// SetPropertiesSubscriber writes to updateCh when any unit's properties +// change. Every property change reported by systemd will be sent; that is, no +// transitions will be "missed" (as they might be with SetSubStateSubscriber). +// However, state changes will only be written to the channel with non-blocking +// writes. If updateCh is full, it attempts to write an error to errCh; if +// errCh is full, the error passes silently. +func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) { + c.propertiesSubscriber.Lock() + defer c.propertiesSubscriber.Unlock() + c.propertiesSubscriber.updateCh = updateCh + c.propertiesSubscriber.errCh = errCh +} + +// we don't need to worry about shouldIgnore() here because +// sendPropertiesUpdate doesn't call GetProperties() +func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) { + c.propertiesSubscriber.Lock() + defer c.propertiesSubscriber.Unlock() + + if c.propertiesSubscriber.updateCh == nil { + return + } + + update := &PropertiesUpdate{unitName(unitPath), changedProps} + + select { + case c.propertiesSubscriber.updateCh <- update: + default: + select { + case c.propertiesSubscriber.errCh <- errors.New("update channel is full"): + default: + } + return + } +} diff --git a/dbus/subscription_test.go b/dbus/subscription_test.go index afb9f53d..c16bf70b 100644 --- a/dbus/subscription_test.go +++ b/dbus/subscription_test.go @@ -151,3 +151,59 @@ func TestSubStateSubscription(t *testing.T) { } } } + +// TestPropertiesSubscription exercises the basics of property change event subscriptions +func TestPropertiesSubscription(t *testing.T) { + target := "subscribe-events.service" + + conn, err := New() + defer conn.Close() + if err != nil { + t.Fatal(err) + } + + err = conn.Subscribe() + if err != nil { + t.Fatal(err) + } + + updateCh := make(chan *PropertiesUpdate) + errCh := make(chan error) + conn.SetPropertiesSubscriber(updateCh, errCh) + + setupUnit(target, conn, t) + linkUnit(target, conn, t) + + reschan := make(chan string) + _, err = conn.StartUnit(target, "replace", reschan) + if err != nil { + t.Fatal(err) + } + + job := <-reschan + if job != "done" { + t.Fatal("Couldn't start", target) + } + + timeout := make(chan bool, 1) + go func() { + time.Sleep(3 * time.Second) + close(timeout) + }() + + for { + select { + case update := <-updateCh: + if update.UnitName == target { + subState, ok := update.Changed["SubState"].Value().(string) + if ok && subState == "running" { + return // success + } + } + case err := <-errCh: + t.Fatal(err) + case <-timeout: + t.Fatal("Reached timeout") + } + } +}