Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.3.0
github.com/nats-io/go-nats-streaming v0.3.4
github.com/nats-io/go-nats v1.3.0 // indirect
github.com/nats-io/nats-server v1.4.1 // indirect
github.com/nats-io/nats-streaming-server v0.15.1 // indirect
github.com/nats-io/nats.go v1.8.1
github.com/nats-io/stan.go v0.5.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pelletier/go-toml v1.3.0 // indirect
github.com/philhofer/fwd v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44=
github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ=
github.com/nats-io/go-nats v1.3.0 h1:CrvnAwoB2A2Yma+PcM+5tC++3/wswhcy8OvzqbsUXZQ=
github.com/nats-io/go-nats v1.3.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/go-nats-streaming v0.3.4 h1:4z1stoQQfetddeodZ4huO24LK0QY75Mg4r9037J90MI=
github.com/nats-io/go-nats-streaming v0.3.4/go.mod h1:gfq4R3c9sKAINOpelo0gn/b9QDMBZnmrttcsNF+lqyo=
github.com/nats-io/jwt v0.2.6 h1:eAyoYvGgGLXR2EpnsBUvi/FcFrBqN6YKFVbOoEfPN4k=
github.com/nats-io/jwt v0.2.6/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY=
github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA=
Expand All @@ -101,6 +99,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.4.5 h1:lPZ9y1jVGiXcTaUc1SnEIWPYfh0avuEiHBePNJYgpPk=
github.com/nats-io/stan.go v0.4.5/go.mod h1:Ji7mK6gRZJSH1nc3ZJH6vi7zn/QnZhpR9Arm4iuzsUQ=
github.com/nats-io/stan.go v0.5.0 h1:ZaSPMb6jnDXsSlOACynJrUiB3Evleg3ZyyX+rnf3TlQ=
github.com/nats-io/stan.go v0.5.0/go.mod h1:dYqB+vMN3C2F9pT1FRQpg9eHbjPj6mP0yYuyBNuXHZE=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
Expand Down
109 changes: 103 additions & 6 deletions messaging/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,107 @@ package messaging
import (
"fmt"
"strings"

stan "github.com/nats-io/go-nats-streaming"
"github.com/nats-io/go-nats-streaming/pb"
"time"

"github.com/netlify/netlify-commons/nconf"

"github.com/pkg/errors"

"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"

"github.com/netlify/netlify-commons/discovery"
"github.com/sirupsen/logrus"
)

func StartPoint(config *nconf.NatsConfig) (stan.SubscriptionOption, error) {
switch v := strings.ToLower(config.StartPos); v {
const (
NatsAuthMethodUser = "user"
NatsAuthMethodToken = "token"
NatsAuthMethodTLS = "tls"
)

type NatsAuth struct {
Method string `mapstructure:"method"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
Token string `mapstructure:"token"`
}

type NatsConfig struct {
TLS *nconf.TLSConfig `mapstructure:"tls_conf"`
DiscoveryName string `mapstructure:"discovery_name" split_words:"true"`
Servers []string `mapstructure:"servers"`
Auth NatsAuth `mapstructure:"auth"`

// for streaming
ClusterID string `mapstructure:"cluster_id" split_words:"true"`
ClientID string `mapstructure:"client_id" split_words:"true"`
StartPos string `mapstructure:"start_pos" split_words:"true"`
}

type NatsClientConfig struct {
NatsConfig
Subject string `mapstructure:"command_subject"`
Group string `mapstructure:"command_group"`

// StartAt will configure where the client should resume the stream:
// - `all`: all the messages available
// - `last`: from where the client left off
// - `new`: all new messages for the client
// - `first`: from the first message available (default)
// - other: if it isn't one of the above fields, it will try and parse the param as a go duration (e.g. 30s, 1h)
StartAt string `mapstructure:"start_at"`
}

func (c *NatsConfig) LoadServerNames() error {
if c.DiscoveryName == "" {
return nil
}

natsURLs := []string{}
endpoints, err := discovery.DiscoverEndpoints(c.DiscoveryName)
if err != nil {
return err
}

for _, endpoint := range endpoints {
natsURLs = append(natsURLs, fmt.Sprintf("nats://%s:%d", endpoint.Target, endpoint.Port))
}

c.Servers = natsURLs
return nil
}

// ServerString will build the proper string for nats connect
func (c *NatsConfig) ServerString() string {
return strings.Join(c.Servers, ",")
}

func (c *NatsConfig) Fields() logrus.Fields {
f := logrus.Fields{
"servers": strings.Join(c.Servers, ","),
}

if c.Auth.Method != "" {
f["auth_method"] = c.Auth.Method
}

if c.TLS != nil {
f["ca_files"] = strings.Join(c.TLS.CAFiles, ",")
f["key_file"] = c.TLS.KeyFile
f["cert_file"] = c.TLS.CertFile
}

if c.ClusterID != "" {
f["client_id"] = c.ClientID
f["cluster_id"] = c.ClusterID
}

return f
}

func (c *NatsConfig) StartPoint() (stan.SubscriptionOption, error) {
switch v := strings.ToLower(c.StartPos); v {
case "all":
return stan.DeliverAllAvailable(), nil
case "last":
Expand All @@ -20,6 +112,11 @@ func StartPoint(config *nconf.NatsConfig) (stan.SubscriptionOption, error) {
return stan.StartAt(pb.StartPosition_NewOnly), nil
case "", "first":
return stan.StartAt(pb.StartPosition_First), nil
default:
dur, err := time.ParseDuration(v)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse field as a duration")
}
return stan.StartAtTimeDelta(dur), nil
}
return nil, fmt.Errorf("Unknown start position '%s', possible values are all, last, new, first and ''", config.StartPos)
}
19 changes: 9 additions & 10 deletions messaging/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"strings"
"time"

"github.com/nats-io/go-nats"
stan "github.com/nats-io/go-nats-streaming"
"github.com/netlify/netlify-commons/nconf"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand All @@ -22,7 +21,7 @@ func init() {
silent = logrus.NewEntry(l)
}

func ConfigureNatsConnection(config *nconf.NatsConfig, log logrus.FieldLogger) (*nats.Conn, error) {
func ConfigureNatsConnection(config *NatsConfig, log logrus.FieldLogger) (*nats.Conn, error) {
if log == nil {
log = silent
}
Expand All @@ -49,7 +48,7 @@ func ConfigureNatsConnection(config *nconf.NatsConfig, log logrus.FieldLogger) (
return nc, nil
}

func ConnectToNats(config *nconf.NatsConfig, opts ...nats.Option) (*nats.Conn, error) {
func ConnectToNats(config *NatsConfig, opts ...nats.Option) (*nats.Conn, error) {
tlsConfig, err := config.TLS.TLSConfig()
if err != nil {
return nil, errors.Wrap(err, "Failed to configure TLS")
Expand All @@ -61,11 +60,11 @@ func ConnectToNats(config *nconf.NatsConfig, opts ...nats.Option) (*nats.Conn, e
}

switch strings.ToLower(config.Auth.Method) {
case nconf.NatsAuthMethodUser:
case NatsAuthMethodUser:
opts = append(opts, nats.UserInfo(config.Auth.User, config.Auth.Password))
case nconf.NatsAuthMethodToken:
case NatsAuthMethodToken:
opts = append(opts, nats.Token(config.Auth.Token))
case nconf.NatsAuthMethodTLS:
case NatsAuthMethodTLS:
// if using TLS auth, make sure the client certificate is loaded
if tlsConfig == nil || len(tlsConfig.Certificates) == 0 {
return nil, fmt.Errorf("TLS auth method is configured but no certificate was loaded")
Expand All @@ -78,7 +77,7 @@ func ConnectToNats(config *nconf.NatsConfig, opts ...nats.Option) (*nats.Conn, e
return nats.Connect(config.ServerString(), opts...)
}

func ConfigureNatsStreaming(config *nconf.NatsConfig, log logrus.FieldLogger) (stan.Conn, error) {
func ConfigureNatsStreaming(config *NatsConfig, log logrus.FieldLogger) (stan.Conn, error) {
// connect to the underlying instance
nc, err := ConfigureNatsConnection(config, log)
if err != nil {
Expand All @@ -93,7 +92,7 @@ func ConfigureNatsStreaming(config *nconf.NatsConfig, log logrus.FieldLogger) (s
return conn, nil
}

func ConnectToNatsStreaming(nc *nats.Conn, config *nconf.NatsConfig, log logrus.FieldLogger) (stan.Conn, error) {
func ConnectToNatsStreaming(nc *nats.Conn, config *NatsConfig, log logrus.FieldLogger) (stan.Conn, error) {
if config.ClusterID == "" {
return nil, errors.New("Must provide a cluster ID to connect to streaming nats")
}
Expand Down
81 changes: 0 additions & 81 deletions nconf/nats.go

This file was deleted.