From e98a875fd3edec9493cd10455d9ebbb04982c933 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Thu, 3 Apr 2025 17:43:58 +0300 Subject: [PATCH 1/3] Add the server back as soon as possible and not wait for the informer to update the cache --- pkg/gameserverallocations/allocator.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index d0699514f0..a841271fae 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -602,6 +602,8 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int } res.err = errors.Wrap(err, "error updating allocated gameserver") } else { + // Ubi change: add the server back as soon as possible and not wait for the informer to update the cache + c.allocationCache.AddGameServer(gs) res.gs = gs } From 7b0cdb58b28db48fcf21b77b8dc3db9069c30cb8 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Thu, 3 Apr 2025 17:47:03 +0300 Subject: [PATCH 2/3] No quick 400s (still do retries for ErrNoGameServer) --- pkg/gameserverallocations/allocator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index a841271fae..f3534e0e7c 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -696,8 +696,9 @@ func Retry(backoff wait.Backoff, fn func() error) error { switch { case err == nil: return true, nil - case err == ErrNoGameServer: - return true, err + // Ubi change: No quick 400s (still do retries for ErrNoGameServer) + //case err == ErrNoGameServer: + // return true, err case err == ErrTotalTimeoutExceeded: return true, err default: From b6b535445dd8d8784fc5237fc4f44c6225b2bb9d Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Mon, 14 Apr 2025 18:35:35 +0300 Subject: [PATCH 3/3] * Batch size metric * Use unsafe patch instead of update * Allocator batches allocations * Update duration metric --- install/helm/agones/defaultfeaturegates.yaml | 2 + pkg/apis/agones/v1/gameserver.go | 28 +++ pkg/gameserverallocations/allocator.go | 31 ++- pkg/gameserverallocations/batch_allocator.go | 235 +++++++++++++++++++ pkg/gameserverallocations/metrics.go | 35 ++- pkg/util/runtime/features.go | 8 + 6 files changed, 335 insertions(+), 4 deletions(-) create mode 100644 pkg/gameserverallocations/batch_allocator.go diff --git a/install/helm/agones/defaultfeaturegates.yaml b/install/helm/agones/defaultfeaturegates.yaml index 1528804e7d..430234679a 100644 --- a/install/helm/agones/defaultfeaturegates.yaml +++ b/install/helm/agones/defaultfeaturegates.yaml @@ -30,6 +30,8 @@ PortPolicyNone: false ScheduledAutoscaler: false # Dev features +AllocatorPatchesGameservers: false +AllocatorBatchesChanges: false # Example feature Example: false diff --git a/pkg/apis/agones/v1/gameserver.go b/pkg/apis/agones/v1/gameserver.go index 0f34a814d6..fdc67c8402 100644 --- a/pkg/apis/agones/v1/gameserver.go +++ b/pkg/apis/agones/v1/gameserver.go @@ -949,6 +949,34 @@ func (gs *GameServer) Patch(delta *GameServer) ([]byte, error) { return result, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) } +func (gs *GameServer) PatchUnsafe(delta *GameServer) ([]byte, error) { + var result []byte + + oldJSON, err := json.Marshal(gs) + if err != nil { + return result, errors.Wrapf(err, "error marshalling to json current GameServer %s", gs.ObjectMeta.Name) + } + + newJSON, err := json.Marshal(delta) + if err != nil { + return result, errors.Wrapf(err, "error marshalling to json delta GameServer %s", delta.ObjectMeta.Name) + } + + patch, err := jsonpatch.CreatePatch(oldJSON, newJSON) + if err != nil { + return result, errors.Wrapf(err, "error creating patch for GameServer %s", gs.ObjectMeta.Name) + } + + // Per https://jsonpatch.com/ "Tests that the specified value is set in the document. If the test + // fails, then the patch as a whole should not apply." + // Used here to check the object has not been updated (has not changed ResourceVersion). + //patches := []jsonpatch.JsonPatchOperation{{Operation: "test", Path: "/metadata/resourceVersion", Value: gs.ObjectMeta.ResourceVersion}} + //patches = append(patches, patch...) + + result, err = json.Marshal(patch) + return result, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) +} + // UpdateCount increments or decrements a CounterStatus on a Game Server by the given amount. func (gs *GameServer) UpdateCount(name string, action string, amount int64) error { if !(action == GameServerPriorityIncrement || action == GameServerPriorityDecrement) { diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index f3534e0e7c..455678e2f3 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -49,6 +49,7 @@ import ( k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" runtimeschema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" informercorev1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -174,7 +175,11 @@ func (c *Allocator) Run(ctx context.Context) error { } // workers and logic for batching allocations - go c.ListenAndAllocate(ctx, maxBatchQueue) + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesChanges) { + go c.ListenAndBatchAllocate(ctx, maxBatchQueue) + } else { + go c.ListenAndAllocate(ctx, maxBatchQueue) + } return nil } @@ -511,6 +516,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int var list []*agonesv1.GameServer var sortKey uint64 requestCount := 0 + batchSize := c.newMetrics(ctx) for { select { @@ -557,6 +563,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int req.response <- response{request: req, gs: nil, err: err} continue } + // remove the game server that has been allocated list = append(list[:index], list[index+1:]...) @@ -571,6 +578,9 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int case <-ctx.Done(): return default: + if requestCount > 0 { + batchSize.recordAllocationsBatchSize(ctx, requestCount) + } list = nil requestCount = 0 // slow down cpu churn, and allow items to batch @@ -621,6 +631,11 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int // applyAllocationToGameServer patches the inputted GameServer with the allocation metadata changes, and updates it to the Allocated State. // Returns the updated GameServer. func (c *Allocator) applyAllocationToGameServer(ctx context.Context, mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (*agonesv1.GameServer, error) { + var gsOriginal *agonesv1.GameServer + if runtime.FeatureEnabled(runtime.FeatureAllocatorPatchesGameservers) { + gsOriginal = gs.DeepCopy() + } + // patch ObjectMeta labels if mp.Labels != nil { if gs.ObjectMeta.Labels == nil { @@ -663,7 +678,19 @@ func (c *Allocator) applyAllocationToGameServer(ctx context.Context, mp allocati } } - gsUpdate, updateErr := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gs, metav1.UpdateOptions{}) + var updateErr error + var gsUpdate *agonesv1.GameServer + if runtime.FeatureEnabled(runtime.FeatureAllocatorPatchesGameservers) { + patch, err := gsOriginal.PatchUnsafe(gs) + if err != nil { + return gsOriginal, errors.Wrapf(err, "error computing the gs patch") + } + gsUpdate, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Patch(ctx, gs.GetObjectMeta().GetName(), types.JSONPatchType, patch, metav1.PatchOptions{}) + updateErr = errors.Wrapf(err, "error returned by the patch request using %s", string(patch)) + } else { + gsUpdate, updateErr = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gs, metav1.UpdateOptions{}) + } + if updateErr != nil { return gsUpdate, updateErr } diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go new file mode 100644 index 0000000000..0f94f8e176 --- /dev/null +++ b/pkg/gameserverallocations/batch_allocator.go @@ -0,0 +1,235 @@ +package gameserverallocations + +import ( + "context" + goErrors "errors" + "time" + + "agones.dev/agones/pkg/apis" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// batchResponse is an async list of responses for matching requests +type batchResponses struct { + responses []response + counterErrors error + listErrors error +} + +func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCount int) chan<- batchResponses { + metrics := c.newMetrics(ctx) + batchUpdateQueue := make(chan batchResponses) + + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case batchRes := <-batchUpdateQueue: + if len(batchRes.responses) > 0 { + // The last response contains the latest gs state + lastGsState := batchRes.responses[len(batchRes.responses)-1].gs + + requestStartTime := time.Now() + + // Try to update with the latest gs state + updatedGs, updateErr := c.gameServerGetter.GameServers(lastGsState.ObjectMeta.Namespace).Update(ctx, lastGsState, metav1.UpdateOptions{}) + if updateErr != nil { + metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime)) + + if !k8serrors.IsConflict(errors.Cause(updateErr)) { + // since we could not allocate, we should put it back + // but not if it's a conflict, as the cache is no longer up to date, and + // we should wait for it to get updated with fresh info. + c.allocationCache.AddGameServer(updatedGs) + } + updateErr = errors.Wrap(updateErr, "error updating allocated gameserver") + } else { + metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime)) + + // Add the server back as soon as possible and not wait for the informer to update the cache + c.allocationCache.AddGameServer(updatedGs) + + // If successful Update record any Counter or List action errors as a warning + if batchRes.counterErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "CounterActionError", batchRes.counterErrors.Error()) + } + if batchRes.listErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "ListActionError", batchRes.listErrors.Error()) + } + c.recorder.Event(updatedGs, corev1.EventTypeNormal, string(updatedGs.Status.State), "Allocated") + } + + // Forward all responses with their appropriate gs state and update error + for _, res := range batchRes.responses { + res.err = updateErr + res.request.response <- res + } + } + case <-ctx.Done(): + return + } + } + }() + } + + return batchUpdateQueue +} + +// ListenAndBatchAllocate is a blocking function that runs in a loop +func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCount int) { + // setup workers for batch allocation updates. Push response values into + // this queue for concurrent updating of GameServers to Allocated + batchUpdateQueue := c.batchAllocationUpdateWorkers(ctx, updateWorkerCount) + + var list []*agonesv1.GameServer + var sortKey uint64 + requestCount := 0 + + batchSize := c.newMetrics(ctx) + batchResponsesPerGs := make(map[string]batchResponses) + + flush := func() { + if requestCount > 0 { + batchSize.recordAllocationsBatchSize(ctx, requestCount) + } + + for _, batchResponses := range batchResponsesPerGs { + batchUpdateQueue <- batchResponses + } + batchResponsesPerGs = make(map[string]batchResponses) + + list = nil + requestCount = 0 + } + + for { + select { + case req := <-c.pendingRequests: + // refresh the list after every 100 allocations made in a single batch + if requestCount >= maxBatchBeforeRefresh { + flush() + } + requestCount++ + + // SortKey returns the sorting values (list of Priorities) as a determinstic key. + // In case gsa.Spec.Priorities is nil this will still return a sortKey. + // In case of error this will return 0 for the sortKey. + newSortKey, err := req.gsa.SortKey() + if err != nil { + c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec", err) + } + // Set sortKey if this is the first request, or the previous request errored on creating a sortKey. + if sortKey == uint64(0) { + sortKey = newSortKey + } + + if newSortKey != sortKey { + sortKey = newSortKey + flush() // MP: not sure ??? + } + + // Sort list if necessary + if list == nil { + if req.gsa.Spec.Scheduling == apis.Packed { + list = c.allocationCache.ListSortedGameServers(req.gsa) + } else { + // Scheduling == Distributed, sort game servers by Priorities + list = c.allocationCache.ListSortedGameServersPriorities(req.gsa) + } + } + + gs, _, err := findGameServerForAllocation(req.gsa, list) + if err != nil { + req.response <- response{request: req, gs: nil, err: err} + continue + } + + // Apply the allocation to the local copy of gs + applyError, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gs, req.gsa) + if applyError == nil { + if existingBatch, exists := batchResponsesPerGs[string(gs.UID)]; exists { + existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gs.DeepCopy(), err: nil}) + existingBatch.counterErrors = goErrors.Join(existingBatch.counterErrors, counterErrors) + existingBatch.listErrors = goErrors.Join(existingBatch.listErrors, listErrors) + batchResponsesPerGs[string(gs.UID)] = existingBatch + } else { + if removeErr := c.allocationCache.RemoveGameServer(gs); removeErr != nil { + // this seems unlikely, but lets handle it just in case + removeErr = errors.Wrap(removeErr, "error removing gameserver from cache") + req.response <- response{request: req, gs: nil, err: removeErr} + } else { + batchResponsesPerGs[string(gs.UID)] = batchResponses{ + responses: []response{response{request: req, gs: gs.DeepCopy(), err: nil}}, + counterErrors: counterErrors, + listErrors: listErrors, + } + } + } + } else { + req.response <- response{request: req, gs: nil, err: applyError} + } + case <-ctx.Done(): + flush() + return + default: + flush() + + // slow down cpu churn, and allow items to batch + time.Sleep(c.batchWaitTime) + } + } +} + +// applyAllocationToLocalGameServer patches the inputted GameServer with the allocation metadata changes, and updates it to the Allocated State. +// Returns the encountered errors. +func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (error, error, error) { + // add last allocated, so it always gets updated, even if it is already Allocated + ts, err := time.Now().MarshalText() + if err != nil { + return err, nil, nil + } + gs.ObjectMeta.Annotations[LastAllocatedAnnotationKey] = string(ts) + gs.Status.State = agonesv1.GameServerStateAllocated + + // patch ObjectMeta labels + if mp.Labels != nil { + if gs.ObjectMeta.Labels == nil { + gs.ObjectMeta.Labels = make(map[string]string, len(mp.Labels)) + } + for key, value := range mp.Labels { + gs.ObjectMeta.Labels[key] = value + } + } + + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, len(mp.Annotations)) + } + // apply annotations patch + for key, value := range mp.Annotations { + gs.ObjectMeta.Annotations[key] = value + } + + // perfom any Counter or List actions + var counterErrors error + var listErrors error + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + if gsa.Spec.Counters != nil { + for counter, ca := range gsa.Spec.Counters { + counterErrors = goErrors.Join(counterErrors, ca.CounterActions(counter, gs)) + } + } + if gsa.Spec.Lists != nil { + for list, la := range gsa.Spec.Lists { + listErrors = goErrors.Join(listErrors, la.ListActions(list, gs)) + } + } + } + + return nil, counterErrors, listErrors +} diff --git a/pkg/gameserverallocations/metrics.go b/pkg/gameserverallocations/metrics.go index 64fb4d50ec..667329939d 100644 --- a/pkg/gameserverallocations/metrics.go +++ b/pkg/gameserverallocations/metrics.go @@ -40,8 +40,10 @@ var ( keyStatus = mt.MustTagKey("status") keySchedulingStrategy = mt.MustTagKey("scheduling_strategy") - gameServerAllocationsLatency = stats.Float64("gameserver_allocations/latency", "The duration of gameserver allocations", "s") - gameServerAllocationsRetryTotal = stats.Int64("gameserver_allocations/errors", "The errors of gameserver allocations", "1") + gameServerAllocationsLatency = stats.Float64("gameserver_allocations/latency", "The duration of gameserver allocations", "s") + gameServerAllocationsUpdatesLatency = stats.Float64("gameserver_allocations/update_latency", "The duration of gameserver updates", "s") + gameServerAllocationsRetryTotal = stats.Int64("gameserver_allocations/errors", "The errors of gameserver allocations", "1") + gameServerAllocationsBatchSize = stats.Int64("gameserver_allocations/batch", "The gameserver allocations batch size", "1") ) func init() { @@ -54,6 +56,13 @@ func init() { Aggregation: view.Distribution(0, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2, 3), TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, }, + { + Name: "gameserver_allocations_updates_duration_seconds", + Measure: gameServerAllocationsUpdatesLatency, + Description: "The distribution of gameserver allocation update requests latencies.", + Aggregation: view.Distribution(0, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2, 3), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, { Name: "gameserver_allocations_retry_total", Measure: gameServerAllocationsRetryTotal, @@ -61,6 +70,13 @@ func init() { Aggregation: view.Distribution(1, 2, 3, 4, 5), TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, }, + { + Name: "gameserver_allocations_batch_size", + Measure: gameServerAllocationsBatchSize, + Description: "The count of gameserver allocations in a batch", + Aggregation: view.Distribution(1, 2, 3, 4, 5, 10, 20, 50, 100), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, } for _, v := range stateViews { @@ -148,3 +164,18 @@ func (r *metrics) recordAllocationRetrySuccess(ctx context.Context, retryCount i mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, gameServerAllocationsRetryTotal.M(int64(retryCount))) } + +// record the current allocation batch size rate. +func (r *metrics) recordAllocationsBatchSize(ctx context.Context, count int) { + stats.Record(ctx, gameServerAllocationsBatchSize.M(int64(count))) +} + +func (r *metrics) recordAllocationUpdateSuccess(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} + +func (r *metrics) recordAllocationUpdateFailure(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Failure")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} diff --git a/pkg/util/runtime/features.go b/pkg/util/runtime/features.go index e3a3ade6e2..c3d545a7d3 100644 --- a/pkg/util/runtime/features.go +++ b/pkg/util/runtime/features.go @@ -73,6 +73,12 @@ const ( //////////////// // Dev features + // FeatureAllocatorPatchesGameservers is a feature flag to enable/disable the allocator using patch instead of update to update the gameserver CRD. + FeatureAllocatorPatchesGameservers Feature = "AllocatorPatchesGameservers" + + // FeatureAllocatorBatchesChanges is a feature flag to enable/disable the allocator using batches to update the gameserver CRD. + FeatureAllocatorBatchesChanges Feature = "AllocatorBatchesChanges" + //////////////// // Example feature @@ -146,6 +152,8 @@ var ( FeatureScheduledAutoscaler: false, // Dev features + FeatureAllocatorPatchesGameservers: false, + FeatureAllocatorBatchesChanges: true, // Example feature FeatureExample: false,