Skip to content

Commit 6544225

Browse files
committed
Actually starting to use the deallocator to clean up services
**- What I did** This patch actually stars to use the deallocator to clean up services, as well as service-level resources (as of now, only networks). **- How I did it** A previous patch (moby#2759) introduced a new component, the deallocator, responsible for cleaning up services and service-level resources. This patch is actually starting to make use of that component. Since the deallocator used to rely on the reaper deleting the tasks belonging to services that had been marked for removal, a previous version of this patch was modifying the task reaper quite heavily to also keep track of such services (needed since otherwise the reaper would fail to clean up all of them, instead keeping some for history tracking purposes). However, it soon appeared that this was not the best approach: * this creates a hidden coupling between the reaper and the deallocator * it's also not the best user experience to suddenly remove all of a service's task history while shutting down, for not apparent reason to the user For these reasons, this patch instead amends the deallocator to also look at tasks status when keeping track of how many alive tasks remain to a service. **- How to test it** Updated tests. **- Description for the changelog** Services & networks are no longer deleted immediately when a user requests their deletion; instead, they are deleted when all their tasks are actually shut down. Signed-off-by: Jean Rouge <jer329@cornell.edu>
1 parent bc032e2 commit 6544225

14 files changed

Lines changed: 357 additions & 122 deletions

File tree

cmd/swarmctl/network/inspect.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ func printNetworkSummary(network *api.Network) {
5151
common.FprintfIfNotEmpty(w, "ID\t: %s\n", network.ID)
5252
common.FprintfIfNotEmpty(w, "Name\t: %s\n", spec.Annotations.Name)
5353

54+
if network.PendingDelete {
55+
common.FprintfIfNotEmpty(w, "[Network %s marked for removal]\n", spec.Annotations.Name)
56+
}
57+
5458
fmt.Fprintln(w, "Spec:\t")
5559
if len(spec.Annotations.Labels) > 0 {
5660
fmt.Fprintln(w, " Labels:\t")

cmd/swarmctl/service/inspect.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ func printServiceSummary(service *api.Service, running int) {
2222
w := tabwriter.NewWriter(os.Stdout, 8, 8, 8, ' ', 0)
2323
defer w.Flush()
2424

25-
task := service.Spec.Task
25+
spec := service.Spec
2626
common.FprintfIfNotEmpty(w, "ID\t: %s\n", service.ID)
27-
common.FprintfIfNotEmpty(w, "Name\t: %s\n", service.Spec.Annotations.Name)
28-
if len(service.Spec.Annotations.Labels) > 0 {
27+
common.FprintfIfNotEmpty(w, "Name\t: %s\n", spec.Annotations.Name)
28+
29+
if service.PendingDelete {
30+
common.FprintfIfNotEmpty(w, "[Service %s marked for removal]\n", spec.Annotations.Name)
31+
}
32+
33+
if len(spec.Annotations.Labels) > 0 {
2934
fmt.Fprintln(w, "Labels\t")
30-
for k, v := range service.Spec.Annotations.Labels {
35+
for k, v := range spec.Annotations.Labels {
3136
fmt.Fprintf(w, " %s\t: %s\n", k, v)
3237
}
3338
}
@@ -51,7 +56,8 @@ func printServiceSummary(service *api.Service, running int) {
5156

5257
fmt.Fprintln(w, "Template\t")
5358
fmt.Fprintln(w, " Container\t")
54-
ctr := service.Spec.Task.GetContainer()
59+
task := spec.Task
60+
ctr := task.GetContainer()
5561
common.FprintfIfNotEmpty(w, " Image\t: %s\n", ctr.Image)
5662
common.FprintfIfNotEmpty(w, " Command\t: %q\n", strings.Join(ctr.Command, " "))
5763
common.FprintfIfNotEmpty(w, " Args\t: [%s]\n", strings.Join(ctr.Args, ", "))
@@ -90,7 +96,7 @@ func printServiceSummary(service *api.Service, running int) {
9096
printResources(w, res.Limits)
9197
}
9298
}
93-
if len(service.Spec.Task.Networks) > 0 {
99+
if len(spec.Task.Networks) > 0 {
94100
fmt.Fprint(w, " Networks:")
95101
for _, n := range service.Spec.Task.Networks {
96102
fmt.Fprintf(w, " %s", n.Target)

manager/controlapi/network.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ func (s *Server) removeNetwork(id string) error {
199199
return status.Errorf(codes.Internal, "could not find services using network %s: %v", id, err)
200200
}
201201

202-
if len(services) != 0 {
203-
return status.Errorf(codes.FailedPrecondition, "network %s is in use by service %s", id, services[0].ID)
202+
for _, service := range services {
203+
if !service.PendingDelete {
204+
return status.Errorf(codes.FailedPrecondition, "network %s is in use by service %s", id, service.ID)
205+
}
204206
}
205207

206208
tasks, err := store.FindTasks(tx, store.ByReferencedNetworkID(id))
@@ -214,7 +216,12 @@ func (s *Server) removeNetwork(id string) error {
214216
}
215217
}
216218

217-
return store.DeleteNetwork(tx, id)
219+
network := store.GetNetwork(tx, id)
220+
if network == nil {
221+
return status.Errorf(codes.NotFound, "network %s not found", id)
222+
}
223+
network.PendingDelete = true
224+
return store.UpdateNetwork(tx, network)
218225
})
219226
}
220227

manager/controlapi/network_test.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,30 @@ func TestRemoveNetworkWithAttachedService(t *testing.T) {
191191
assert.NoError(t, err)
192192
assert.NotEqual(t, nr.Network, nil)
193193
assert.NotEqual(t, nr.Network.ID, "")
194-
createServiceInNetwork(t, ts, "name", "image", nr.Network.ID, 1)
194+
createServiceInNetwork(t, ts, "service1", "image", nr.Network.ID, 1)
195195
_, err = ts.Client.RemoveNetwork(context.Background(), &api.RemoveNetworkRequest{NetworkID: nr.Network.ID})
196196
assert.Error(t, err)
197197
}
198198

199+
func TestRemoveNetworkWithAttachedServiceMarkedForRemoval(t *testing.T) {
200+
ts := newTestServer(t)
201+
defer ts.Stop()
202+
nr, err := ts.Client.CreateNetwork(context.Background(), &api.CreateNetworkRequest{
203+
Spec: createNetworkSpec("testnet5"),
204+
})
205+
assert.NoError(t, err)
206+
assert.NotEqual(t, nr.Network, nil)
207+
assert.NotEqual(t, nr.Network.ID, "")
208+
service := createServiceInNetwork(t, ts, "service2", "image", nr.Network.ID, 1)
209+
// then let's delete the service
210+
r, err := ts.Client.RemoveService(context.Background(), &api.RemoveServiceRequest{ServiceID: service.ID})
211+
assert.NoError(t, err)
212+
assert.NotNil(t, r)
213+
// now we should be able to delete the network
214+
_, err = ts.Client.RemoveNetwork(context.Background(), &api.RemoveNetworkRequest{NetworkID: nr.Network.ID})
215+
assert.NoError(t, err)
216+
}
217+
199218
func TestCreateNetworkInvalidDriver(t *testing.T) {
200219
ts := newTestServer(t)
201220
defer ts.Stop()

manager/controlapi/service.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,7 @@ func (s *Server) GetService(ctx context.Context, request *api.GetServiceRequest)
718718
// - Returns `NotFound` if the Service is not found.
719719
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
720720
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
721+
// - Returns `FailedPrecondition` if the Service is marked for removal
721722
// - Returns an error if the update fails.
722723
func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRequest) (*api.UpdateServiceResponse, error) {
723724
if request.ServiceID == "" || request.ServiceVersion == nil {
@@ -751,6 +752,10 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
751752
return status.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
752753
}
753754

755+
if service.PendingDelete {
756+
return status.Errorf(codes.FailedPrecondition, "service %s is marked for removal", request.ServiceID)
757+
}
758+
754759
// It's not okay to update Service.Spec.Networks on its own.
755760
// However, if Service.Spec.Task.Networks is also being
756761
// updated, that's okay (for example when migrating from the
@@ -844,12 +849,15 @@ func (s *Server) RemoveService(ctx context.Context, request *api.RemoveServiceRe
844849
}
845850

846851
err := s.store.Update(func(tx store.Tx) error {
847-
return store.DeleteService(tx, request.ServiceID)
852+
service := store.GetService(tx, request.ServiceID)
853+
if service == nil {
854+
return status.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
855+
}
856+
// mark service for removal
857+
service.PendingDelete = true
858+
return store.UpdateService(tx, service)
848859
})
849860
if err != nil {
850-
if err == store.ErrNotExist {
851-
return nil, status.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
852-
}
853861
return nil, err
854862
}
855863
return &api.RemoveServiceResponse{}, nil

manager/controlapi/service_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,19 @@ func TestUpdateService(t *testing.T) {
916916
assert.Equal(t, codes.InvalidArgument, testutils.ErrorCode(err))
917917
}
918918

919+
func TestUpdateServiceMarkedForRemoval(t *testing.T) {
920+
ts := newTestServer(t)
921+
defer ts.Stop()
922+
923+
service := createService(t, ts, "name", "image", 1)
924+
r, err := ts.Client.RemoveService(context.Background(), &api.RemoveServiceRequest{ServiceID: service.ID})
925+
assert.NoError(t, err)
926+
assert.NotNil(t, r)
927+
928+
_, err = ts.Client.UpdateService(context.Background(), &api.UpdateServiceRequest{ServiceID: service.ID, Spec: &service.Spec, ServiceVersion: &service.Meta.Version})
929+
assert.Equal(t, codes.FailedPrecondition, testutils.ErrorCode(err))
930+
}
931+
919932
func TestServiceUpdateRejectNetworkChange(t *testing.T) {
920933
ts := newTestServer(t)
921934
defer ts.Stop()

manager/deallocator/deallocator.go

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package deallocator
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/docker/go-events"
78
"github.com/docker/swarmkit/api"
@@ -32,6 +33,9 @@ import (
3233
type Deallocator struct {
3334
store *store.MemoryStore
3435

36+
// closeOnce ensures that stopChan is closed only once
37+
closeOnce sync.Once
38+
3539
// for services that are shutting down, we keep track of how many
3640
// tasks still exist for them
3741
services map[string]*serviceWithTaskCounts
@@ -94,6 +98,7 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
9498

9599
return
96100
},
101+
api.EventUpdateTask{},
97102
api.EventDeleteTask{},
98103
api.EventUpdateService{},
99104
api.EventUpdateNetwork{})
@@ -129,7 +134,7 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
129134
for {
130135
select {
131136
case event := <-eventsChan:
132-
if updated, err := deallocator.processNewEvent(ctx, event); err == nil {
137+
if updated, err := deallocator.handleEvent(ctx, event); err == nil {
133138
deallocator.notifyEventChan(updated)
134139
} else {
135140
log.G(ctx).WithError(err).Errorf("error processing deallocator event %#v", event)
@@ -142,11 +147,16 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
142147
}
143148
}
144149

145-
// Stop stops the deallocator's routine
146-
// FIXME (jrouge): see the comment on TaskReaper.Stop() and see when to properly stop this
147-
// plus unit test on this!
150+
// Stop stops the deallocator's routine and wait for the main loop to exit
151+
// Stop can be called in two cases. One when the manager is
152+
// shutting down, and the other when the manager (the leader) is
153+
// becoming a follower. Since these two instances could race with
154+
// each other, we use closeOnce here to ensure that TaskReaper.Stop()
155+
// is called only once to avoid a panic.
148156
func (deallocator *Deallocator) Stop() {
149-
close(deallocator.stopChan)
157+
deallocator.closeOnce.Do(func() {
158+
close(deallocator.stopChan)
159+
})
150160
<-deallocator.doneChan
151161
}
152162

@@ -180,12 +190,21 @@ func (deallocator *Deallocator) processService(ctx context.Context, service *api
180190
// better to clean up resources that shouldn't be cleaned up yet
181191
// than ending up with a service and some resources lost in limbo forever
182192
return true, deallocator.deallocateService(ctx, service)
183-
} else if len(tasks) == 0 {
184-
// no tasks remaining for this service, we can clean it up
185-
return true, deallocator.deallocateService(ctx, service)
193+
} else {
194+
remainingTasks := 0
195+
for _, task := range tasks {
196+
if isTaskStillAlive(task) {
197+
remainingTasks++
198+
}
199+
}
200+
201+
if remainingTasks == 0 {
202+
// no tasks remaining for this service, we can clean it up
203+
return true, deallocator.deallocateService(ctx, service)
204+
}
205+
deallocator.services[service.ID] = &serviceWithTaskCounts{service: service, taskCount: remainingTasks}
206+
return false, nil
186207
}
187-
deallocator.services[service.ID] = &serviceWithTaskCounts{service: service, taskCount: len(tasks)}
188-
return false, nil
189208
}
190209

191210
func (deallocator *Deallocator) deallocateService(ctx context.Context, service *api.Service) (err error) {
@@ -263,24 +282,16 @@ func (deallocator *Deallocator) processNetwork(ctx context.Context, tx store.Tx,
263282
return
264283
}
265284

266-
// Processes new events, and dispatches to the right method depending on what
285+
// Handles new events, and dispatches to the right method depending on what
267286
// type of event it is.
268287
// The boolean part of the return tuple indicates whether anything was actually
269288
// removed from the store
270-
func (deallocator *Deallocator) processNewEvent(ctx context.Context, event events.Event) (bool, error) {
289+
func (deallocator *Deallocator) handleEvent(ctx context.Context, event events.Event) (bool, error) {
271290
switch typedEvent := event.(type) {
291+
case api.EventUpdateTask:
292+
return deallocator.processTaskEvent(ctx, typedEvent.Task, typedEvent.OldTask)
272293
case api.EventDeleteTask:
273-
serviceID := typedEvent.Task.ServiceID
274-
275-
if serviceWithCount, present := deallocator.services[serviceID]; present {
276-
if serviceWithCount.taskCount <= 1 {
277-
delete(deallocator.services, serviceID)
278-
return deallocator.processService(ctx, serviceWithCount.service)
279-
}
280-
serviceWithCount.taskCount--
281-
}
282-
283-
return false, nil
294+
return deallocator.processTaskEvent(ctx, nil, typedEvent.Task)
284295
case api.EventUpdateService:
285296
return deallocator.processService(ctx, typedEvent.Service)
286297
case api.EventUpdateNetwork:
@@ -289,3 +300,30 @@ func (deallocator *Deallocator) processNewEvent(ctx context.Context, event event
289300
return false, nil
290301
}
291302
}
303+
304+
// Common logic for handling task update/delete events
305+
// newTask is nil for delete events
306+
func (deallocator *Deallocator) processTaskEvent(ctx context.Context, newTask, oldTask *api.Task) (bool, error) {
307+
serviceID := oldTask.ServiceID
308+
serviceWithCount, present := deallocator.services[serviceID]
309+
310+
if present && isTaskStillAlive(oldTask) && (newTask == nil || !isTaskStillAlive(newTask)) {
311+
// this task belongs to a service that's shutting down, and was amongst
312+
// the ones that we counted as alive when adding this service to our
313+
// internal state
314+
315+
if serviceWithCount.taskCount <= 1 {
316+
delete(deallocator.services, serviceID)
317+
return deallocator.processService(ctx, serviceWithCount.service)
318+
}
319+
serviceWithCount.taskCount--
320+
}
321+
322+
return false, nil
323+
}
324+
325+
// simple helper function to distinguish tasks that are still running
326+
// from ones that are done
327+
func isTaskStillAlive(task *api.Task) bool {
328+
return task.Status.State <= api.TaskStateRunning
329+
}

0 commit comments

Comments
 (0)