1- package util
1+ package distributed
22
33import (
44 "context"
@@ -13,9 +13,9 @@ import (
1313 timestamppb "google.golang.org/protobuf/types/known/timestamppb"
1414)
1515
16- const weakOwnerPartition = "weak -ownership"
16+ const mostlyCorrectOwnerPartition = "mc -ownership"
1717
18- // A WeakOwner uses a Store to allow roughly at most a single goroutine to
18+ // MostlyCorrectOwner uses a Store to allow roughly at most a single goroutine to
1919// handle an operation for a key, across all processes sharing that store.
2020// It can block but never deadlock. "Rough" ownership means that when the
2121// owner is too slow another owner might mistakenly be added.
@@ -25,10 +25,10 @@ const weakOwnerPartition = "weak-ownership"
2525// - single ownership is not required for correctness, AND
2626// - only one concurrent goroutine can succeed
2727//
28- // then using a WeakOwner can help improve performance by usually allowing
28+ // then using a MostlyCorrectOwner can help improve performance by usually allowing
2929// only one goroutine into a critical section. This reduces retries.
3030//
31- // WeakOwner works by setting an ownership key with timed expiration along
31+ // MostlyCorrectOwner works by setting an ownership key with timed expiration along
3232// with a goroutine that refreshes expiration of that key. This can fail:
3333//
3434// - if clocks are not synchronized
@@ -37,14 +37,14 @@ const weakOwnerPartition = "weak-ownership"
3737// So it *cannot* guarantee correctness. However it usually works, and if
3838// it does work, the owning goroutine wins all races by default.
3939//
40- // WeakOwner creates some additional load on its KV partition:
40+ // MostlyCorrectOwner creates some additional load on its KV partition:
4141//
4242// - Acquiring ownership reads at least once and writes (SetIf) once. If
4343// the key is already held, each acquisition reads once every
4444// acquireInterval and once every time ownership expires.
4545//
4646// - Holding a lock reads and writes (SetIf) once every refreshInterval.
47- type WeakOwner struct {
47+ type MostlyCorrectOwner struct {
4848 // acquireInterval is the polling interval for acquiring ownership.
4949 // Reducing it reduces some additional time to recover if an
5050 // instance crashes while holding ownership. Reducing it too much
@@ -69,12 +69,12 @@ type WeakOwner struct {
6969 // goroutiness on multiple cooperating processes.
7070 Store kv.Store
7171 // Prefix is used to separate "locking" keys between different
72- // instances of WeakOwner .
72+ // instances of MostlyCorrectOwner .
7373 Prefix string
7474}
7575
76- func NewWeakOwner (log logging.Logger , store kv.Store , prefix string , acquireInterval , refreshInterval time.Duration ) * WeakOwner {
77- return & WeakOwner {
76+ func NewMostlyCorrectOwner (log logging.Logger , store kv.Store , prefix string , acquireInterval , refreshInterval time.Duration ) * MostlyCorrectOwner {
77+ return & MostlyCorrectOwner {
7878 acquireInterval : acquireInterval ,
7979 refreshInterval : refreshInterval ,
8080 Log : log ,
@@ -96,7 +96,7 @@ func getJitter(interval time.Duration) time.Duration {
9696}
9797
9898// refreshKey refreshes key for owner at interval until ctx is cancelled.
99- func (w * WeakOwner ) refreshKey (ctx context.Context , owner string , prefixedKey []byte ) {
99+ func (w * MostlyCorrectOwner ) refreshKey (ctx context.Context , owner string , prefixedKey []byte ) {
100100 // Always refresh before ownership expires.
101101 //nolint:mnd
102102 interval := w .refreshInterval / 2
@@ -123,8 +123,8 @@ func (w *WeakOwner) refreshKey(ctx context.Context, owner string, prefixedKey []
123123 log .Trace ("Cancelled; stop refreshing ownership" )
124124 return
125125 case <- ticker .C :
126- ownership := WeakOwnership {}
127- predicate , err := kv .GetMsg (ctx , w .Store , weakOwnerPartition , prefixedKey , & ownership )
126+ ownership := MostlyCorrectOwnership {}
127+ predicate , err := kv .GetMsg (ctx , w .Store , mostlyCorrectOwnerPartition , prefixedKey , & ownership )
128128 if err != nil {
129129 log .WithError (err ).Warn ("Failed to get ownership message to refresh" )
130130 // Do NOT attempt to delete, to avoid
@@ -140,7 +140,7 @@ func (w *WeakOwner) refreshKey(ctx context.Context, owner string, prefixedKey []
140140 }
141141 expires := time .Now ().Add (w .refreshInterval )
142142 ownership .Expires = timestamppb .New (expires )
143- err = kv .SetMsgIf (ctx , w .Store , weakOwnerPartition , prefixedKey , & ownership , predicate )
143+ err = kv .SetMsgIf (ctx , w .Store , mostlyCorrectOwnerPartition , prefixedKey , & ownership , predicate )
144144 if err != nil {
145145 log .WithError (err ).Warn ("Failed to set ownership message to refresh (keep going, may lose)" )
146146 continue
@@ -186,25 +186,25 @@ func checkOwnership(expires, now time.Time, getErr error) keyOwnership {
186186 }
187187}
188188
189- // startOwningKey blocks until it gets weak ownership of key in store.
190- // This is a spin-wait (with sleeps) because of the KV interface.
189+ // startOwningKey blocks until it gets mostly-correct ownership of key in
190+ // store. This is a spin-wait (with sleeps) because of the KV interface.
191191//
192192// TODO(ariels): Spin only once (maybe use WaitFor, or chain requests and
193193// spin only on the first) when multiple goroutines all wait for the same
194194// key.
195195//
196196// TODO(ariels): Be fair, at least in the same process. Chaining requests
197197// and spinning only on the first would do this as well.
198- func (w * WeakOwner ) startOwningKey (ctx context.Context , owner string , prefixedKey []byte ) error {
198+ func (w * MostlyCorrectOwner ) startOwningKey (ctx context.Context , owner string , prefixedKey []byte ) error {
199199 log := w .Log .WithContext (ctx ).WithFields (logging.Fields {
200200 "owner" : owner ,
201201 "prefixed_key" : string (prefixedKey ),
202202 "acquire_interval" : w .acquireInterval ,
203203 "refresh_interval" : w .refreshInterval ,
204204 })
205205 for {
206- ownership := WeakOwnership {}
207- predicate , err := kv .GetMsg (ctx , w .Store , weakOwnerPartition , prefixedKey , & ownership )
206+ ownership := MostlyCorrectOwnership {}
207+ predicate , err := kv .GetMsg (ctx , w .Store , mostlyCorrectOwnerPartition , prefixedKey , & ownership )
208208 if err != nil && ! errors .Is (err , kv .ErrNotFound ) {
209209 return fmt .Errorf ("start owning %s for %s: %w" , prefixedKey , owner , err )
210210 }
@@ -213,7 +213,7 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
213213 free := checkOwnership (ownership .Expires .AsTime (), now , err )
214214 if free != keyOwned {
215215 expiryTime := now .Add (w .refreshInterval )
216- ownership = WeakOwnership {
216+ ownership = MostlyCorrectOwnership {
217217 Owner : owner ,
218218 Expires : timestamppb .New (expiryTime ),
219219 Comment : fmt .Sprintf ("%s@%v" , owner , now ),
@@ -223,7 +223,7 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
223223 "expires" : expiryTime ,
224224 "now" : now ,
225225 }).Trace ("Try to take ownership" )
226- err = kv .SetMsgIf (ctx , w .Store , weakOwnerPartition , prefixedKey , & ownership , predicate )
226+ err = kv .SetMsgIf (ctx , w .Store , mostlyCorrectOwnerPartition , prefixedKey , & ownership , predicate )
227227 if err == nil {
228228 log .Trace ("Got ownership" )
229229 return nil
@@ -246,13 +246,13 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
246246}
247247
248248// releaseIf releases prefixedKey if it has the owner.
249- func (w * WeakOwner ) releaseIf (ctx context.Context , owner string , prefixedKey []byte ) error {
249+ func (w * MostlyCorrectOwner ) releaseIf (ctx context.Context , owner string , prefixedKey []byte ) error {
250250 log := w .Log .WithContext (ctx ).WithFields (logging.Fields {
251251 "prefixed_key" : string (prefixedKey ),
252252 "owner" : owner ,
253253 })
254- ownership := WeakOwnership {}
255- predicate , err := kv .GetMsg (ctx , w .Store , weakOwnerPartition , prefixedKey , & ownership )
254+ ownership := MostlyCorrectOwnership {}
255+ predicate , err := kv .GetMsg (ctx , w .Store , mostlyCorrectOwnerPartition , prefixedKey , & ownership )
256256 if err != nil {
257257 return fmt .Errorf ("get ownership message %s to release it from %s: %w" , string (prefixedKey ), owner , err )
258258 }
@@ -264,7 +264,7 @@ func (w *WeakOwner) releaseIf(ctx context.Context, owner string, prefixedKey []b
264264 }
265265 // Set expiration to the beginning of time - definitely expired.
266266 ownership .Expires .Reset ()
267- err = kv .SetMsgIf (ctx , w .Store , weakOwnerPartition , prefixedKey , & ownership , predicate )
267+ err = kv .SetMsgIf (ctx , w .Store , mostlyCorrectOwnerPartition , prefixedKey , & ownership , predicate )
268268 if errors .Is (err , kv .ErrPredicateFailed ) {
269269 log .WithFields (logging.Fields {
270270 "prefixed_key" : string (prefixedKey ),
@@ -277,10 +277,11 @@ func (w *WeakOwner) releaseIf(ctx context.Context, owner string, prefixedKey []b
277277 return err
278278}
279279
280- // Own blocks until it gets weak ownership of key for owner. Ownership will be refreshed at
281- // resolution interval. It returns a function to stop owning key. Own appends its random slug to
282- // owner, to identify the owner uniquely.
283- func (w * WeakOwner ) Own (ctx context.Context , owner , key string ) (func (), error ) {
280+ // Own blocks until it gets mostly-correct ownership of key for owner.
281+ // Ownership will be refreshed at resolution interval. It returns a
282+ // function to stop owning key. Own appends its random slug to owner, to
283+ // identify the owner uniquely.
284+ func (w * MostlyCorrectOwner ) Own (ctx context.Context , owner , key string ) (func (), error ) {
284285 owner = fmt .Sprintf ("%s#%s" , owner , nanoid .Must ())
285286 prefixedKey := []byte (fmt .Sprintf ("%s/%s" , w .Prefix , key ))
286287 err := w .startOwningKey (ctx , owner , prefixedKey )
0 commit comments