-
Notifications
You must be signed in to change notification settings - Fork 66
feat(kafka): add --wait flag to perform synchronous Kafka creation #960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,27 +5,31 @@ import ( | |||||||
| "encoding/json" | ||||||||
| "errors" | ||||||||
| "fmt" | ||||||||
| "net/http" | ||||||||
| "time" | ||||||||
|
|
||||||||
| kafkamgmtclient "github.com/redhat-developer/app-services-sdk-go/kafkamgmt/apiv1/client" | ||||||||
| "gopkg.in/yaml.v2" | ||||||||
|
|
||||||||
| "github.com/redhat-developer/app-services-cli/pkg/color" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/dump" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/localize" | ||||||||
|
|
||||||||
| "github.com/redhat-developer/app-services-cli/pkg/ams" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/cmd/flag" | ||||||||
| flagutil "github.com/redhat-developer/app-services-cli/pkg/cmdutil/flags" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/connection" | ||||||||
| svcstatus "github.com/redhat-developer/app-services-cli/pkg/service/status" | ||||||||
|
|
||||||||
| "github.com/redhat-developer/app-services-cli/pkg/cloudprovider/cloudproviderutil" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/cloudregion/cloudregionutil" | ||||||||
|
|
||||||||
| "github.com/AlecAivazis/survey/v2" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/dump" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/iostreams" | ||||||||
| pkgKafka "github.com/redhat-developer/app-services-cli/pkg/kafka" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/logging" | ||||||||
|
|
||||||||
| "github.com/spf13/cobra" | ||||||||
| "gopkg.in/yaml.v2" | ||||||||
|
|
||||||||
| "github.com/redhat-developer/app-services-cli/internal/config" | ||||||||
| "github.com/redhat-developer/app-services-cli/pkg/cmd/factory" | ||||||||
|
|
@@ -43,6 +47,7 @@ type Options struct { | |||||||
| autoUse bool | ||||||||
|
|
||||||||
| interactive bool | ||||||||
| wait bool | ||||||||
|
|
||||||||
| IO *iostreams.IOStreams | ||||||||
| Config config.IConfig | ||||||||
|
|
@@ -111,6 +116,7 @@ func NewCreateCommand(f *factory.Factory) *cobra.Command { | |||||||
| cmd.Flags().StringVar(&opts.region, flags.FlagRegion, "", opts.localizer.MustLocalize("kafka.create.flag.cloudRegion.description")) | ||||||||
| cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "json", opts.localizer.MustLocalize("kafka.common.flag.output.description")) | ||||||||
| cmd.Flags().BoolVar(&opts.autoUse, "use", true, opts.localizer.MustLocalize("kafka.create.flag.autoUse.description")) | ||||||||
| cmd.Flags().BoolVarP(&opts.wait, "wait", "w", false, opts.localizer.MustLocalize("kafka.create.flag.wait.description")) | ||||||||
|
|
||||||||
| _ = cmd.RegisterFlagCompletionFunc(flags.FlagProvider, func(cmd *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { | ||||||||
| return cmdutil.FetchCloudProviders(f) | ||||||||
|
|
@@ -174,48 +180,72 @@ func runCreate(opts *Options) error { | |||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| logger.Info(opts.localizer.MustLocalize("kafka.create.log.info.creatingKafka", localize.NewEntry("Name", payload.Name))) | ||||||||
|
|
||||||||
| api := connection.API() | ||||||||
|
|
||||||||
| a := api.Kafka().CreateKafka(context.Background()) | ||||||||
| a = a.KafkaRequestPayload(*payload) | ||||||||
| a = a.Async(true) | ||||||||
| response, httpRes, err := a.Execute() | ||||||||
| defer httpRes.Body.Close() | ||||||||
|
|
||||||||
| if httpRes.StatusCode == 409 { | ||||||||
| if httpRes.StatusCode == http.StatusBadRequest { | ||||||||
| return errors.New(opts.localizer.MustLocalize("kafka.create.error.conflictError", localize.NewEntry("Name", payload.Name))) | ||||||||
| } | ||||||||
|
|
||||||||
| if err != nil { | ||||||||
| return err | ||||||||
| } | ||||||||
|
|
||||||||
| logger.Info(opts.localizer.MustLocalize("kafka.create.info.successMessage", localize.NewEntry("Name", response.GetName()))) | ||||||||
|
|
||||||||
| switch opts.outputFormat { | ||||||||
| case dump.JSONFormat: | ||||||||
| data, _ := json.MarshalIndent(response, "", cmdutil.DefaultJSONIndent) | ||||||||
| _ = dump.JSON(opts.IO.Out, data) | ||||||||
| case dump.YAMLFormat, dump.YMLFormat: | ||||||||
| data, _ := yaml.Marshal(response) | ||||||||
| _ = dump.YAML(opts.IO.Out, data) | ||||||||
| } | ||||||||
|
|
||||||||
| kafkaCfg := &config.KafkaConfig{ | ||||||||
| ClusterID: response.GetId(), | ||||||||
| } | ||||||||
|
|
||||||||
| if opts.autoUse { | ||||||||
| logger.Debug("Auto-use is set, updating the current instance") | ||||||||
| cfg.Services.Kafka = kafkaCfg | ||||||||
| if err := opts.Config.Save(cfg); err != nil { | ||||||||
| if err = opts.Config.Save(cfg); err != nil { | ||||||||
| return fmt.Errorf("%v: %w", opts.localizer.MustLocalize("kafka.common.error.couldNotUseKafka"), err) | ||||||||
| } | ||||||||
| } else { | ||||||||
| logger.Debug("Auto-use is not set, skipping updating the current instance") | ||||||||
| } | ||||||||
|
|
||||||||
| if opts.wait { | ||||||||
| logger.Debug("--wait flag is enabled, waiting for Kafka to finish creating") | ||||||||
| s := opts.IO.NewSpinner() | ||||||||
| s.Suffix = fmt.Sprintf(" %v", opts.localizer.MustLocalize("kafka.create.log.info.creatingKafka", localize.NewEntry("Name", opts.name))) | ||||||||
| s.Start() | ||||||||
|
|
||||||||
| for svcstatus.IsCreating(response.GetStatus()) { | ||||||||
| time.Sleep(cmdutil.DefaultPollTime) | ||||||||
| s.Suffix = " " + opts.localizer.MustLocalize("kafka.create.log.info.creationInProgress", localize.NewEntry("Name", response.GetName()), localize.NewEntry("Status", color.Info(response.GetStatus()))) | ||||||||
| response, httpRes, err = api.Kafka().GetKafkaById(context.Background(), response.GetId()).Execute() | ||||||||
| defer httpRes.Body.Close() | ||||||||
| logger.Debug("Checking Kafka status:", response.GetStatus()) | ||||||||
| if err != nil { | ||||||||
| return err | ||||||||
| } | ||||||||
| } | ||||||||
| s.Stop() | ||||||||
| logger.Info() | ||||||||
| logger.Info() | ||||||||
|
||||||||
| logger.Info() | |
| logger.Info() | |
| logger.Info("\n\n") |
wtrocki marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,10 @@ | ||
| package cmdutil | ||
|
|
||
| import "time" | ||
|
|
||
| const ( | ||
| // The default indentation to use when printing data to stdout | ||
| DefaultJSONIndent = " " | ||
| // DefaultPollTime is the default interval to wait when polling a network request | ||
| DefaultPollTime = time.Millisecond * 5000 | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,9 @@ package iostreams | |
| import ( | ||
| "io" | ||
| "os" | ||
| "time" | ||
|
|
||
| "github.com/briandowns/spinner" | ||
| "github.com/fatih/color" | ||
| "github.com/mattn/go-isatty" | ||
| ) | ||
|
|
@@ -88,6 +90,11 @@ func (s *IOStreams) IsSSHSession() bool { | |
| return hasClient || hasTTY | ||
| } | ||
|
|
||
| // NewSpinner returns a new spinner progress bar to be used in synchronous commands | ||
| func (s *IOStreams) NewSpinner() *spinner.Spinner { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CC @ankithans |
||
| return spinner.New(spinner.CharSets[9], 100*time.Millisecond, spinner.WithWriter(s.ErrOut)) | ||
| } | ||
|
|
||
| func isTerminal(f *os.File) bool { | ||
| return isatty.IsTerminal(f.Fd()) || isatty.IsCygwinTerminal(f.Fd()) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -38,13 +38,23 @@ one = 'Cloud Provider Region ID' | |||||
| [kafka.create.flag.autoUse.description] | ||||||
| one = 'Set the new Kafka instance to the current instance' | ||||||
|
|
||||||
| [kafka.create.flag.wait.description] | ||||||
| one = 'Wait for the Kafka instance to finish creating' | ||||||
|
||||||
| one = 'Wait for the Kafka instance to finish creating' | |
| one = 'Wait until the Kafka instance is created' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package svcstatus | ||
|
|
||
| type ServiceStatus = string | ||
|
|
||
| // accepted, preparing, provisioning, ready, failed, deprovision, deleting | ||
| const ( | ||
| StatusAccepted ServiceStatus = "accepted" | ||
| StatusPreparing ServiceStatus = "preparing" | ||
| StatusProvisioning ServiceStatus = "provisioning" | ||
| StatusReady ServiceStatus = "ready" | ||
| StatusFailed ServiceStatus = "failed" | ||
| StatusDeprovision ServiceStatus = "deprovision" | ||
| StatusDeleting ServiceStatus = "deleting" | ||
| ) | ||
|
|
||
| func IsCreating(status string) bool { | ||
| return status == StatusAccepted || status == StatusPreparing || status == StatusProvisioning | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.