From 754e59450118ca097389a89a4ff879f0481f3cfc Mon Sep 17 00:00:00 2001 From: Laure-di Date: Wed, 23 Apr 2025 18:09:27 +0200 Subject: [PATCH] feat(inference): add sweeper and waiter to v1 --- api/inference/v1/inference_utils.go | 59 +++++++++++++++++++++++++++ api/inference/v1/sweepers/sweepers.go | 44 ++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 api/inference/v1/inference_utils.go create mode 100644 api/inference/v1/sweepers/sweepers.go diff --git a/api/inference/v1/inference_utils.go b/api/inference/v1/inference_utils.go new file mode 100644 index 000000000..559681163 --- /dev/null +++ b/api/inference/v1/inference_utils.go @@ -0,0 +1,59 @@ +package inference + +import ( + "time" + + "github.com/scaleway/scaleway-sdk-go/errors" + "github.com/scaleway/scaleway-sdk-go/internal/async" + "github.com/scaleway/scaleway-sdk-go/scw" +) + +const ( + defaultRetryInterval = 15 * time.Second + defaultTimeout = 30 * time.Minute +) + +type WaitForDeploymentRequest struct { + DeploymentID string + Region scw.Region + Status DeploymentStatus + Timeout *time.Duration + RetryInterval *time.Duration +} + +func (s *API) WaitForDeployment(req *WaitForDeploymentRequest, opts ...scw.RequestOption) (*Deployment, error) { + timeout := defaultTimeout + if req.Timeout != nil { + timeout = *req.Timeout + } + retryInterval := defaultRetryInterval + if req.RetryInterval != nil { + retryInterval = *req.RetryInterval + } + + terminalStatus := map[DeploymentStatus]struct{}{ + DeploymentStatusReady: {}, + DeploymentStatusError: {}, + DeploymentStatusLocked: {}, + } + + deployment, err := async.WaitSync(&async.WaitSyncConfig{ + Get: func() (interface{}, bool, error) { + deployment, err := s.GetDeployment(&GetDeploymentRequest{ + Region: req.Region, + DeploymentID: req.DeploymentID, + }, opts...) + if err != nil { + return nil, false, err + } + _, isTerminal := terminalStatus[deployment.Status] + return deployment, isTerminal, nil + }, + IntervalStrategy: async.LinearIntervalStrategy(retryInterval), + Timeout: timeout, + }) + if err != nil { + return nil, errors.Wrap(err, "waiting for deployment failed") + } + return deployment.(*Deployment), nil +} diff --git a/api/inference/v1/sweepers/sweepers.go b/api/inference/v1/sweepers/sweepers.go new file mode 100644 index 000000000..56fc62a89 --- /dev/null +++ b/api/inference/v1/sweepers/sweepers.go @@ -0,0 +1,44 @@ +package sweepers + +import ( + "fmt" + + "github.com/scaleway/scaleway-sdk-go/api/inference/v1" + "github.com/scaleway/scaleway-sdk-go/logger" + "github.com/scaleway/scaleway-sdk-go/scw" +) + +func SweepDeployment(scwClient *scw.Client, region scw.Region) error { + inferenceAPI := inference.NewAPI(scwClient) + logger.Warningf("sweeper: destroying the inference deployments in (%s)", region) + listDeployments, err := inferenceAPI.ListDeployments( + &inference.ListDeploymentsRequest{ + Region: region, + }, scw.WithAllPages()) + if err != nil { + return fmt.Errorf("error listing deployment in (%s) in sweeper: %s", region, err) + } + + for _, deployment := range listDeployments.Deployments { + _, err := inferenceAPI.DeleteDeployment(&inference.DeleteDeploymentRequest{ + DeploymentID: deployment.ID, + Region: region, + }) + if err != nil { + return fmt.Errorf("error deleting deployment in sweeper: %s", err) + } + } + + return nil +} + +func SweepAllLocalities(scwClient *scw.Client) error { + for _, locality := range (&inference.API{}).Regions() { + err := SweepDeployment(scwClient, locality) + if err != nil { + return err + } + } + + return nil +}