Skip to content

Commit 1c2a4c0

Browse files
author
Enda
authored
feat: auto-use kafka cluster after creation (#142)
1 parent 4f0ee08 commit 1c2a4c0

8 files changed

Lines changed: 75 additions & 35 deletions

File tree

.vscode/launch.json

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"--url=localhost:8000"
1717
]
1818
},
19-
{
19+
{
2020
"name": "Logout",
2121
"type": "go",
2222
"request": "launch",
@@ -27,6 +27,19 @@
2727
"logout"
2828
]
2929
},
30+
{
31+
"name": "Create Kafka",
32+
"type": "go",
33+
"request": "launch",
34+
"mode": "auto",
35+
"program": "${workspaceFolder}/cmd/rhoas",
36+
"env": {},
37+
"args": [
38+
"kafka",
39+
"create",
40+
"--name=my-kafka-instance"
41+
]
42+
},
3043
{
3144
"name": "List Kafkas",
3245
"type": "go",

CONTRIBUTING.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,7 @@ Your config should look as follows:
191191
...
192192
"services": {
193193
"kafka": {
194-
"clusterHost": "localhost:9092",
195-
"clusterId": "1iSY6RQ3JKI8Q0OTmjQFd3ocFRg",
196-
"clusterName": "serviceapi"
194+
"clusterId": "1iSY6RQ3JKI8Q0OTmjQFd3ocFRg"
197195
}
198196
}
199197
...

pkg/cmd/kafka/connect/connect.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ func connectToCluster(connection *pkgConnection.Connection) {
127127

128128
currentNamespace, _, _ := kubeClientconfig.Namespace()
129129
clicfg, err := config.Load()
130+
if err != nil {
131+
fmt.Fprintf(os.Stderr, "Could not load config: %v\n", err)
132+
return
133+
}
130134

131135
if !clicfg.HasKafka() || forceKafkaSelect {
132136
clicfg = useKafka(clicfg, connection)
@@ -135,12 +139,22 @@ func connectToCluster(connection *pkgConnection.Connection) {
135139
}
136140
}
137141

142+
kafkaCfg := clicfg.Services.Kafka
143+
144+
managedservices := connection.NewMASClient()
145+
kafkaInstance, _, err := managedservices.DefaultApi.GetKafkaById(context.TODO(), kafkaCfg.ClusterID)
146+
147+
if err != nil {
148+
fmt.Fprintf(os.Stderr, "Could not get Kafka cluster with ID '%v': %v", kafkaCfg.ClusterID, err)
149+
return
150+
}
151+
138152
if err != nil {
139153
fmt.Fprint(os.Stderr, "\nInvalid configuration file", err)
140154
return
141155
}
142156

143-
fmt.Fprintf(os.Stderr, statusMsg, color.HiGreenString(clicfg.Services.Kafka.ClusterName), color.HiGreenString(currentNamespace), color.HiGreenString(secretName))
157+
fmt.Fprintf(os.Stderr, statusMsg, color.HiGreenString(kafkaInstance.Name), color.HiGreenString(currentNamespace), color.HiGreenString(secretName))
144158
if shouldContinue := showQuestion("Do you want to continue?"); shouldContinue == false {
145159
return
146160
}
@@ -150,7 +164,7 @@ func connectToCluster(connection *pkgConnection.Connection) {
150164
return
151165
}
152166
createSecret(credentials, currentNamespace, clientset)
153-
createCR(clicfg, clientset, currentNamespace)
167+
createCR(clientset, &kafkaInstance, currentNamespace)
154168
}
155169

156170
func fileExists(path string) bool {
@@ -241,8 +255,8 @@ func createSecret(credentials *managedservices.TokenResponse, currentNamespace s
241255
return secret
242256
}
243257

244-
func createCR(clicfg *config.Config, clientset *kubernetes.Clientset, namespace string) {
245-
crName := secretName + "-" + clicfg.Services.Kafka.ClusterName
258+
func createCR(clientset *kubernetes.Clientset, kafkaInstance *managedservices.KafkaRequest, namespace string) {
259+
crName := secretName + "-" + kafkaInstance.Name
246260
crInstance := &connection.ManagedKafkaConnection{
247261
ObjectMeta: metav1.ObjectMeta{
248262
Name: crName,
@@ -254,7 +268,7 @@ func createCR(clicfg *config.Config, clientset *kubernetes.Clientset, namespace
254268
},
255269
Spec: connection.ManagedKafkaConnectionSpec{
256270
BootstrapServer: connection.BootstrapServerSpec{
257-
Host: clicfg.Services.Kafka.ClusterHost,
271+
Host: kafkaInstance.BootstrapServerHost,
258272
},
259273
Credentials: connection.CredentialsSpec{
260274
Kind: connection.ClientCredentials,
@@ -313,7 +327,7 @@ func useKafka(cliconfig *config.Config, connection *pkgConnection.Connection) *c
313327
index, _, err := prompt.Run()
314328
if err == nil {
315329
selectedKafka := response.Items[index]
316-
var kafkaConfig config.KafkaConfig = config.KafkaConfig{ClusterID: selectedKafka.Id, ClusterName: selectedKafka.Name, ClusterHost: selectedKafka.BootstrapServerHost}
330+
var kafkaConfig config.KafkaConfig = config.KafkaConfig{ClusterID: selectedKafka.Id}
317331
cliconfig.Services.SetKafka(&kafkaConfig)
318332

319333
return cliconfig

pkg/cmd/kafka/create/create.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ func NewCreateCommand() *cobra.Command {
4747
}
4848
opts.cfg = cfg
4949

50-
err = kafka.ValidateName(opts.name)
51-
if err != nil {
50+
if err = kafka.ValidateName(opts.name); err != nil {
5251
return err
5352
}
5453

@@ -59,8 +58,6 @@ func NewCreateCommand() *cobra.Command {
5958
cmd.Flags().StringVarP(&opts.name, flags.FlagName, "n", "", "Name of the new Kafka cluster")
6059
cmd.Flags().StringVar(&opts.provider, flags.FlagProvider, "aws", "Cloud provider ID")
6160
cmd.Flags().StringVar(&opts.region, flags.FlagRegion, "us-east-1", "Cloud Provider Region ID")
62-
// Hardcoded as only true is possible
63-
// cmd.Flags().BoolVar(&opts.multiAZ, flags.FlagMultiAZ, true, "Determines if cluster should be provisioned across multiple Availability Zones")
6461
cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "json", "Format to display the Kafka cluster. Choose from: \"json\", \"yaml\", \"yml\"")
6562

6663
_ = cmd.MarkFlagRequired(flags.FlagName)
@@ -69,29 +66,42 @@ func NewCreateCommand() *cobra.Command {
6966
}
7067

7168
func runCreate(opts *options) error {
72-
connection, err := opts.cfg.Connection()
69+
cfg := opts.cfg
70+
71+
connection, err := cfg.Connection()
7372
if err != nil {
7473
return fmt.Errorf("Can't create connection: %w", err)
7574
}
7675

7776
client := connection.NewMASClient()
7877

78+
fmt.Fprintln(os.Stderr, "Creating Kafka cluster")
79+
7980
kafkaRequest := managedservices.KafkaRequestPayload{Name: opts.name, Region: opts.region, CloudProvider: opts.provider, MultiAz: true}
8081
response, _, err := client.DefaultApi.CreateKafka(context.Background(), true, kafkaRequest)
8182

8283
if err != nil {
8384
return fmt.Errorf("Error while requesting new Kafka cluster: %w", err)
8485
}
8586

86-
fmt.Fprintf(os.Stderr, "Created new Kafka cluster:\n")
87+
fmt.Fprintln(os.Stderr, "Created Kafka cluster:")
8788

8889
switch opts.outputFormat {
8990
case "json":
9091
data, _ := json.MarshalIndent(response, "", cmdutil.DefaultJSONIndent)
91-
fmt.Print(string(data))
92+
fmt.Println(string(data))
9293
case "yaml", "yml":
9394
data, _ := yaml.Marshal(response)
94-
fmt.Print(string(data))
95+
fmt.Println(string(data))
96+
}
97+
98+
kafkaCfg := &config.KafkaConfig{
99+
ClusterID: response.Id,
100+
}
101+
102+
cfg.Services.SetKafka(kafkaCfg)
103+
if err := config.Save(cfg); err != nil {
104+
return fmt.Errorf("Unable to automatically use Kafka cluster: %w", err)
95105
}
96106

97107
return nil

pkg/cmd/kafka/use/use.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ func runUse(opts *options) error {
6363

6464
// build Kafka config object from the response
6565
var kafkaConfig config.KafkaConfig = config.KafkaConfig{
66-
ClusterID: res.Id,
67-
ClusterName: res.Name,
68-
ClusterHost: res.BootstrapServerHost,
66+
ClusterID: res.Id,
6967
}
7068

7169
cfg.Services.SetKafka(&kafkaConfig)

pkg/cmd/serviceaccount/create/create.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,18 @@ import (
1616
// Templates
1717
var (
1818
templateProperties = heredoc.Doc(`
19-
## Credentials for Kafka cluster: '%v'
2019
## Generated by rhoas cli
2120
kafka_user=%v
2221
kafka_password=%v
2322
`)
2423

2524
templateEnv = heredoc.Doc(`
26-
## Credentials for Kafka cluster: '%v'
2725
## Generated by rhoas cli
2826
KAFKA_USER=%v
2927
KAFKA_PASSWORD=%v
3028
`)
3129

3230
templateKafkaPlain = heredoc.Doc(`
33-
## Credentials for Kafka cluster: '%v'
3431
## Generated by rhoas cli
3532
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="%v" password="%v";
3633
kafka.sasl.mechanism=PLAIN
@@ -149,7 +146,7 @@ func runCreate(opts *options) error {
149146
}
150147

151148
fmt.Fprintf(os.Stderr, "Writing credentials to %v \n", fileName)
152-
fileContent := fmt.Sprintf(fileFormat, cfg.Services.Kafka.ClusterName, response.ClientID, response.ClientSecret)
149+
fileContent := fmt.Sprintf(fileFormat, response.ClientID, response.ClientSecret)
153150

154151
dataToWrite := []byte(fileContent)
155152

pkg/config/config.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ type ServiceConfigMap struct {
3232

3333
// KafkaConfig is the config for the managed Kafka service
3434
type KafkaConfig struct {
35-
ClusterHost string `json:"clusterHost"`
36-
ClusterID string `json:"clusterId"`
37-
ClusterName string `json:"clusterName"`
35+
ClusterID string `json:"clusterId"`
3836
}
3937

4038
func (c *Config) SetAccessToken(accessToken string) {
@@ -77,9 +75,7 @@ func (s *ServiceConfigMap) SetKafka(k *KafkaConfig) {
7775
// Remove the current Kafka cluster from the config
7876
func (s *ServiceConfigMap) RemoveKafka() {
7977
s.Kafka = &KafkaConfig{
80-
ClusterID: "",
81-
ClusterHost: "",
82-
ClusterName: "",
78+
ClusterID: "",
8379
}
8480
}
8581

pkg/sdk/kafka/topics/topics.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package topics
22

33
import (
4+
"context"
45
"fmt"
56
"net"
67
"os"
@@ -31,15 +32,28 @@ func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {
3132
fmt.Fprint(os.Stderr, err)
3233
}
3334

34-
if cfg.Services.Kafka.ClusterHost == "" {
35+
if cfg.Services.Kafka.ClusterID == "" {
3536
fmt.Fprint(os.Stderr, "No Kafka selected. Run rhoas kafka use")
3637
panic("Missing config")
3738
}
39+
40+
connection, err := cfg.Connection()
41+
if err != nil {
42+
fmt.Fprintf(os.Stderr, "Could not create connection: %v\n", err)
43+
}
44+
45+
managedservices := connection.NewMASClient()
46+
kafkaInstance, _, err := managedservices.DefaultApi.GetKafkaById(context.TODO(), cfg.Services.Kafka.ClusterID)
47+
if err != nil {
48+
fmt.Fprintf(os.Stderr, "Could not get Kafka instance: %v\n", err)
49+
return
50+
}
51+
3852
var clusterURL string
39-
if strings.HasPrefix(cfg.Services.Kafka.ClusterHost, "localhost") {
40-
clusterURL = cfg.Services.Kafka.ClusterHost
53+
if strings.HasPrefix(kafkaInstance.BootstrapServerHost, "localhost") {
54+
clusterURL = kafkaInstance.BootstrapServerHost
4155
} else {
42-
clusterURL = cfg.Services.Kafka.ClusterHost + ":443"
56+
clusterURL = kafkaInstance.BootstrapServerHost + ":443"
4357
}
4458

4559
conn, err := dialer.Dial("tcp", clusterURL)

0 commit comments

Comments
 (0)