Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions install/helm/agones/defaultfeaturegates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ PortPolicyNone: false
ScheduledAutoscaler: false

# Dev features
AllocatorPatchesGameservers: false
AllocatorBatchesChanges: false

# Example feature
Example: false
28 changes: 28 additions & 0 deletions pkg/apis/agones/v1/gameserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,34 @@ func (gs *GameServer) Patch(delta *GameServer) ([]byte, error) {
return result, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name)
}

func (gs *GameServer) PatchUnsafe(delta *GameServer) ([]byte, error) {
var result []byte

oldJSON, err := json.Marshal(gs)
if err != nil {
return result, errors.Wrapf(err, "error marshalling to json current GameServer %s", gs.ObjectMeta.Name)
}

newJSON, err := json.Marshal(delta)
if err != nil {
return result, errors.Wrapf(err, "error marshalling to json delta GameServer %s", delta.ObjectMeta.Name)
}

patch, err := jsonpatch.CreatePatch(oldJSON, newJSON)
if err != nil {
return result, errors.Wrapf(err, "error creating patch for GameServer %s", gs.ObjectMeta.Name)
}

// Per https://jsonpatch.com/ "Tests that the specified value is set in the document. If the test
// fails, then the patch as a whole should not apply."
// Used here to check the object has not been updated (has not changed ResourceVersion).
//patches := []jsonpatch.JsonPatchOperation{{Operation: "test", Path: "/metadata/resourceVersion", Value: gs.ObjectMeta.ResourceVersion}}
//patches = append(patches, patch...)

result, err = json.Marshal(patch)
return result, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name)
}

// UpdateCount increments or decrements a CounterStatus on a Game Server by the given amount.
func (gs *GameServer) UpdateCount(name string, action string, amount int64) error {
if !(action == GameServerPriorityIncrement || action == GameServerPriorityDecrement) {
Expand Down
38 changes: 34 additions & 4 deletions pkg/gameserverallocations/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimeschema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
informercorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -174,7 +175,11 @@ func (c *Allocator) Run(ctx context.Context) error {
}

// workers and logic for batching allocations
go c.ListenAndAllocate(ctx, maxBatchQueue)
if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesChanges) {
go c.ListenAndBatchAllocate(ctx, maxBatchQueue)
} else {
go c.ListenAndAllocate(ctx, maxBatchQueue)
}

return nil
}
Expand Down Expand Up @@ -511,6 +516,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int
var list []*agonesv1.GameServer
var sortKey uint64
requestCount := 0
batchSize := c.newMetrics(ctx)

for {
select {
Expand Down Expand Up @@ -557,6 +563,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int
req.response <- response{request: req, gs: nil, err: err}
continue
}

// remove the game server that has been allocated
list = append(list[:index], list[index+1:]...)

Expand All @@ -571,6 +578,9 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int
case <-ctx.Done():
return
default:
if requestCount > 0 {
batchSize.recordAllocationsBatchSize(ctx, requestCount)
}
list = nil
requestCount = 0
// slow down cpu churn, and allow items to batch
Expand Down Expand Up @@ -602,6 +612,8 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int
}
res.err = errors.Wrap(err, "error updating allocated gameserver")
} else {
// Ubi change: add the server back as soon as possible and not wait for the informer to update the cache
c.allocationCache.AddGameServer(gs)
res.gs = gs
}

Expand All @@ -619,6 +631,11 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int
// applyAllocationToGameServer patches the inputted GameServer with the allocation metadata changes, and updates it to the Allocated State.
// Returns the updated GameServer.
func (c *Allocator) applyAllocationToGameServer(ctx context.Context, mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (*agonesv1.GameServer, error) {
var gsOriginal *agonesv1.GameServer
if runtime.FeatureEnabled(runtime.FeatureAllocatorPatchesGameservers) {
gsOriginal = gs.DeepCopy()
}

// patch ObjectMeta labels
if mp.Labels != nil {
if gs.ObjectMeta.Labels == nil {
Expand Down Expand Up @@ -661,7 +678,19 @@ func (c *Allocator) applyAllocationToGameServer(ctx context.Context, mp allocati
}
}

gsUpdate, updateErr := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gs, metav1.UpdateOptions{})
var updateErr error
var gsUpdate *agonesv1.GameServer
if runtime.FeatureEnabled(runtime.FeatureAllocatorPatchesGameservers) {
patch, err := gsOriginal.PatchUnsafe(gs)
if err != nil {
return gsOriginal, errors.Wrapf(err, "error computing the gs patch")
}
gsUpdate, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Patch(ctx, gs.GetObjectMeta().GetName(), types.JSONPatchType, patch, metav1.PatchOptions{})
updateErr = errors.Wrapf(err, "error returned by the patch request using %s", string(patch))
} else {
gsUpdate, updateErr = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gs, metav1.UpdateOptions{})
}

if updateErr != nil {
return gsUpdate, updateErr
}
Expand Down Expand Up @@ -694,8 +723,9 @@ func Retry(backoff wait.Backoff, fn func() error) error {
switch {
case err == nil:
return true, nil
case err == ErrNoGameServer:
return true, err
// Ubi change: No quick 400s (still do retries for ErrNoGameServer)
//case err == ErrNoGameServer:
// return true, err
case err == ErrTotalTimeoutExceeded:
return true, err
default:
Expand Down
235 changes: 235 additions & 0 deletions pkg/gameserverallocations/batch_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package gameserverallocations

import (
"context"
goErrors "errors"
"time"

"agones.dev/agones/pkg/apis"
agonesv1 "agones.dev/agones/pkg/apis/agones/v1"
allocationv1 "agones.dev/agones/pkg/apis/allocation/v1"
"agones.dev/agones/pkg/util/runtime"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// batchResponse is an async list of responses for matching requests
type batchResponses struct {
responses []response
counterErrors error
listErrors error
}

func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCount int) chan<- batchResponses {
metrics := c.newMetrics(ctx)
batchUpdateQueue := make(chan batchResponses)

for i := 0; i < workerCount; i++ {
go func() {
for {
select {
case batchRes := <-batchUpdateQueue:
if len(batchRes.responses) > 0 {
// The last response contains the latest gs state
lastGsState := batchRes.responses[len(batchRes.responses)-1].gs

requestStartTime := time.Now()

// Try to update with the latest gs state
updatedGs, updateErr := c.gameServerGetter.GameServers(lastGsState.ObjectMeta.Namespace).Update(ctx, lastGsState, metav1.UpdateOptions{})
if updateErr != nil {
metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime))

if !k8serrors.IsConflict(errors.Cause(updateErr)) {
// since we could not allocate, we should put it back
// but not if it's a conflict, as the cache is no longer up to date, and
// we should wait for it to get updated with fresh info.
c.allocationCache.AddGameServer(updatedGs)
}
updateErr = errors.Wrap(updateErr, "error updating allocated gameserver")
} else {
metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime))

// Add the server back as soon as possible and not wait for the informer to update the cache
c.allocationCache.AddGameServer(updatedGs)

// If successful Update record any Counter or List action errors as a warning
if batchRes.counterErrors != nil {
c.recorder.Event(updatedGs, corev1.EventTypeWarning, "CounterActionError", batchRes.counterErrors.Error())
}
if batchRes.listErrors != nil {
c.recorder.Event(updatedGs, corev1.EventTypeWarning, "ListActionError", batchRes.listErrors.Error())
}
c.recorder.Event(updatedGs, corev1.EventTypeNormal, string(updatedGs.Status.State), "Allocated")
}

// Forward all responses with their appropriate gs state and update error
for _, res := range batchRes.responses {
res.err = updateErr
res.request.response <- res
}
}
case <-ctx.Done():
return
}
}
}()
}

return batchUpdateQueue
}

// ListenAndBatchAllocate is a blocking function that runs in a loop
func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCount int) {
// setup workers for batch allocation updates. Push response values into
// this queue for concurrent updating of GameServers to Allocated
batchUpdateQueue := c.batchAllocationUpdateWorkers(ctx, updateWorkerCount)

var list []*agonesv1.GameServer
var sortKey uint64
requestCount := 0

batchSize := c.newMetrics(ctx)
batchResponsesPerGs := make(map[string]batchResponses)

flush := func() {
if requestCount > 0 {
batchSize.recordAllocationsBatchSize(ctx, requestCount)
}

for _, batchResponses := range batchResponsesPerGs {
batchUpdateQueue <- batchResponses
}
batchResponsesPerGs = make(map[string]batchResponses)

list = nil
requestCount = 0
}

for {
select {
case req := <-c.pendingRequests:
// refresh the list after every 100 allocations made in a single batch
if requestCount >= maxBatchBeforeRefresh {
flush()
}
requestCount++

// SortKey returns the sorting values (list of Priorities) as a determinstic key.
// In case gsa.Spec.Priorities is nil this will still return a sortKey.
// In case of error this will return 0 for the sortKey.
newSortKey, err := req.gsa.SortKey()
if err != nil {
c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec", err)
}
// Set sortKey if this is the first request, or the previous request errored on creating a sortKey.
if sortKey == uint64(0) {
sortKey = newSortKey
}

if newSortKey != sortKey {
sortKey = newSortKey
flush() // MP: not sure ???
}

// Sort list if necessary
if list == nil {
if req.gsa.Spec.Scheduling == apis.Packed {
list = c.allocationCache.ListSortedGameServers(req.gsa)
} else {
// Scheduling == Distributed, sort game servers by Priorities
list = c.allocationCache.ListSortedGameServersPriorities(req.gsa)
}
}

gs, _, err := findGameServerForAllocation(req.gsa, list)
if err != nil {
req.response <- response{request: req, gs: nil, err: err}
continue
}

// Apply the allocation to the local copy of gs
applyError, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gs, req.gsa)
if applyError == nil {
if existingBatch, exists := batchResponsesPerGs[string(gs.UID)]; exists {
existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gs.DeepCopy(), err: nil})
existingBatch.counterErrors = goErrors.Join(existingBatch.counterErrors, counterErrors)
existingBatch.listErrors = goErrors.Join(existingBatch.listErrors, listErrors)
batchResponsesPerGs[string(gs.UID)] = existingBatch
} else {
if removeErr := c.allocationCache.RemoveGameServer(gs); removeErr != nil {
// this seems unlikely, but lets handle it just in case
removeErr = errors.Wrap(removeErr, "error removing gameserver from cache")
req.response <- response{request: req, gs: nil, err: removeErr}
} else {
batchResponsesPerGs[string(gs.UID)] = batchResponses{
responses: []response{response{request: req, gs: gs.DeepCopy(), err: nil}},
counterErrors: counterErrors,
listErrors: listErrors,
}
}
}
} else {
req.response <- response{request: req, gs: nil, err: applyError}
}
case <-ctx.Done():
flush()
return
default:
flush()

// slow down cpu churn, and allow items to batch
time.Sleep(c.batchWaitTime)
}
}
}

// applyAllocationToLocalGameServer patches the inputted GameServer with the allocation metadata changes, and updates it to the Allocated State.
// Returns the encountered errors.
func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (error, error, error) {
// add last allocated, so it always gets updated, even if it is already Allocated
ts, err := time.Now().MarshalText()
if err != nil {
return err, nil, nil
}
gs.ObjectMeta.Annotations[LastAllocatedAnnotationKey] = string(ts)
gs.Status.State = agonesv1.GameServerStateAllocated

// patch ObjectMeta labels
if mp.Labels != nil {
if gs.ObjectMeta.Labels == nil {
gs.ObjectMeta.Labels = make(map[string]string, len(mp.Labels))
}
for key, value := range mp.Labels {
gs.ObjectMeta.Labels[key] = value
}
}

if gs.ObjectMeta.Annotations == nil {
gs.ObjectMeta.Annotations = make(map[string]string, len(mp.Annotations))
}
// apply annotations patch
for key, value := range mp.Annotations {
gs.ObjectMeta.Annotations[key] = value
}

// perfom any Counter or List actions
var counterErrors error
var listErrors error
if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) {
if gsa.Spec.Counters != nil {
for counter, ca := range gsa.Spec.Counters {
counterErrors = goErrors.Join(counterErrors, ca.CounterActions(counter, gs))
}
}
if gsa.Spec.Lists != nil {
for list, la := range gsa.Spec.Lists {
listErrors = goErrors.Join(listErrors, la.ListActions(list, gs))
}
}
}

return nil, counterErrors, listErrors
}
Loading