Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func withConsumerGroup(ctx context.Context, client sarama.Client, topic, group s
errorExit("Failed to create consumer group: %v", err)
}

schemaCache = getSchemaCache()

err = cg.Consume(ctx, []string{topic}, &g{})
if err != nil {
errorExit("Error on consume: %v", err)
Expand Down
8 changes: 7 additions & 1 deletion cmd/kaf/kaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func onInit() {
// Any set flags override the configuration
if schemaRegistryURL != "" {
currentCluster.SchemaRegistryURL = schemaRegistryURL
currentCluster.SchemaRegistryCredentials = nil
}

if brokersFlag != nil {
Expand Down Expand Up @@ -247,7 +248,12 @@ func getSchemaCache() (cache *avro.SchemaCache) {
if currentCluster.SchemaRegistryURL == "" {
return nil
}
cache, err := avro.NewSchemaCache(currentCluster.SchemaRegistryURL)
var username, password string
if creds := currentCluster.SchemaRegistryCredentials; creds != nil {
username = creds.Username
password = creds.Password
}
cache, err := avro.NewSchemaCache(currentCluster.SchemaRegistryURL, username, password)
if err != nil {
errorExit("Unable to get schema cache :%v\n", err)
}
Expand Down
12 changes: 12 additions & 0 deletions examples/schema_registry_basic_auth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
clusters:
- name: local
brokers:
- localhost:9092
SASL: null
TLS: null
security-protocol: ""
version: "1.0.0"
schema-registry-url: https://schema.registry.url
schema-registry-credentials:
username: httpbasicauthuser
password: mypasswordisnotsobasic
29 changes: 27 additions & 2 deletions pkg/avro/schema.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package avro

import (
"encoding/base64"
"encoding/binary"
"net/http"
"sync"

schemaregistry "github.com/Landoop/schema-registry"
Expand All @@ -23,9 +25,32 @@ type SchemaCache struct {
codecsBySchemaID map[int]*cachedCodec
}

type transport struct {
underlyingTransport http.RoundTripper
encodedCredentials string
}

// RoundTrip wraps the underlying transport's RoundTripper and injects a
// HTTP Basic authentication header if credentials are provided.
func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
if t.encodedCredentials != "" {
req.Header.Add("Authorization", "Basic "+t.encodedCredentials)
}
return t.underlyingTransport.RoundTrip(req)
}

// NewSchemaCache returns a new Cache instance
func NewSchemaCache(url string) (*SchemaCache, error) {
client, err := schemaregistry.NewClient(url)
func NewSchemaCache(url string, username string, password string) (*SchemaCache, error) {
var encodedCredentials string
if username != "" {
encodedCredentials = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
}
httpClient := &http.Client{Transport: &transport{
underlyingTransport: http.DefaultTransport,
encodedCredentials: encodedCredentials,
}}

client, err := schemaregistry.NewClient(url, schemaregistry.UsingClient(httpClient))
if err != nil {
return nil, err
}
Expand Down
20 changes: 13 additions & 7 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@ type TLS struct {
Insecure bool
}

type SchemaRegistryCredentials struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
}

type Cluster struct {
Name string
Version string `yaml:"version"`
Brokers []string `yaml:"brokers"`
SASL *SASL `yaml:"SASL"`
TLS *TLS `yaml:"TLS"`
SecurityProtocol string `yaml:"security-protocol"`
SchemaRegistryURL string `yaml:"schema-registry-url"`
Name string
Version string `yaml:"version"`
Brokers []string `yaml:"brokers"`
SASL *SASL `yaml:"SASL"`
TLS *TLS `yaml:"TLS"`
SecurityProtocol string `yaml:"security-protocol"`
SchemaRegistryURL string `yaml:"schema-registry-url"`
SchemaRegistryCredentials *SchemaRegistryCredentials `yaml:"schema-registry-credentials"`
}

type Config struct {
Expand Down