diff --git a/go.mod b/go.mod index 89294dd..6131e08 100644 --- a/go.mod +++ b/go.mod @@ -15,10 +15,11 @@ require ( github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect github.com/nats-io/gnatsd v1.4.1 // indirect - github.com/nats-io/go-nats v1.3.0 - github.com/nats-io/go-nats-streaming v0.3.4 + github.com/nats-io/go-nats v1.3.0 // indirect github.com/nats-io/nats-server v1.4.1 // indirect github.com/nats-io/nats-streaming-server v0.15.1 // indirect + github.com/nats-io/nats.go v1.8.1 + github.com/nats-io/stan.go v0.5.0 github.com/opentracing/opentracing-go v1.1.0 github.com/pelletier/go-toml v1.3.0 // indirect github.com/philhofer/fwd v1.0.0 // indirect diff --git a/go.sum b/go.sum index d406a39..e59eded 100644 --- a/go.sum +++ b/go.sum @@ -83,8 +83,6 @@ github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44= github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= github.com/nats-io/go-nats v1.3.0 h1:CrvnAwoB2A2Yma+PcM+5tC++3/wswhcy8OvzqbsUXZQ= github.com/nats-io/go-nats v1.3.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= -github.com/nats-io/go-nats-streaming v0.3.4 h1:4z1stoQQfetddeodZ4huO24LK0QY75Mg4r9037J90MI= -github.com/nats-io/go-nats-streaming v0.3.4/go.mod h1:gfq4R3c9sKAINOpelo0gn/b9QDMBZnmrttcsNF+lqyo= github.com/nats-io/jwt v0.2.6 h1:eAyoYvGgGLXR2EpnsBUvi/FcFrBqN6YKFVbOoEfPN4k= github.com/nats-io/jwt v0.2.6/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY= github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA= @@ -101,6 +99,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.4.5 h1:lPZ9y1jVGiXcTaUc1SnEIWPYfh0avuEiHBePNJYgpPk= github.com/nats-io/stan.go v0.4.5/go.mod h1:Ji7mK6gRZJSH1nc3ZJH6vi7zn/QnZhpR9Arm4iuzsUQ= +github.com/nats-io/stan.go v0.5.0 h1:ZaSPMb6jnDXsSlOACynJrUiB3Evleg3ZyyX+rnf3TlQ= +github.com/nats-io/stan.go v0.5.0/go.mod h1:dYqB+vMN3C2F9pT1FRQpg9eHbjPj6mP0yYuyBNuXHZE= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= diff --git a/messaging/config.go b/messaging/config.go index bb100f2..7aca595 100644 --- a/messaging/config.go +++ b/messaging/config.go @@ -3,15 +3,107 @@ package messaging import ( "fmt" "strings" - - stan "github.com/nats-io/go-nats-streaming" - "github.com/nats-io/go-nats-streaming/pb" + "time" "github.com/netlify/netlify-commons/nconf" + + "github.com/pkg/errors" + + "github.com/nats-io/stan.go" + "github.com/nats-io/stan.go/pb" + + "github.com/netlify/netlify-commons/discovery" + "github.com/sirupsen/logrus" ) -func StartPoint(config *nconf.NatsConfig) (stan.SubscriptionOption, error) { - switch v := strings.ToLower(config.StartPos); v { +const ( + NatsAuthMethodUser = "user" + NatsAuthMethodToken = "token" + NatsAuthMethodTLS = "tls" +) + +type NatsAuth struct { + Method string `mapstructure:"method"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + Token string `mapstructure:"token"` +} + +type NatsConfig struct { + TLS *nconf.TLSConfig `mapstructure:"tls_conf"` + DiscoveryName string `mapstructure:"discovery_name" split_words:"true"` + Servers []string `mapstructure:"servers"` + Auth NatsAuth `mapstructure:"auth"` + + // for streaming + ClusterID string `mapstructure:"cluster_id" split_words:"true"` + ClientID string `mapstructure:"client_id" split_words:"true"` + StartPos string `mapstructure:"start_pos" split_words:"true"` +} + +type NatsClientConfig struct { + NatsConfig + Subject string `mapstructure:"command_subject"` + Group string `mapstructure:"command_group"` + + // StartAt will configure where the client should resume the stream: + // - `all`: all the messages available + // - `last`: from where the client left off + // - `new`: all new messages for the client + // - `first`: from the first message available (default) + // - other: if it isn't one of the above fields, it will try and parse the param as a go duration (e.g. 30s, 1h) + StartAt string `mapstructure:"start_at"` +} + +func (c *NatsConfig) LoadServerNames() error { + if c.DiscoveryName == "" { + return nil + } + + natsURLs := []string{} + endpoints, err := discovery.DiscoverEndpoints(c.DiscoveryName) + if err != nil { + return err + } + + for _, endpoint := range endpoints { + natsURLs = append(natsURLs, fmt.Sprintf("nats://%s:%d", endpoint.Target, endpoint.Port)) + } + + c.Servers = natsURLs + return nil +} + +// ServerString will build the proper string for nats connect +func (c *NatsConfig) ServerString() string { + return strings.Join(c.Servers, ",") +} + +func (c *NatsConfig) Fields() logrus.Fields { + f := logrus.Fields{ + "servers": strings.Join(c.Servers, ","), + } + + if c.Auth.Method != "" { + f["auth_method"] = c.Auth.Method + } + + if c.TLS != nil { + f["ca_files"] = strings.Join(c.TLS.CAFiles, ",") + f["key_file"] = c.TLS.KeyFile + f["cert_file"] = c.TLS.CertFile + } + + if c.ClusterID != "" { + f["client_id"] = c.ClientID + f["cluster_id"] = c.ClusterID + } + + return f +} + +func (c *NatsConfig) StartPoint() (stan.SubscriptionOption, error) { + switch v := strings.ToLower(c.StartPos); v { case "all": return stan.DeliverAllAvailable(), nil case "last": @@ -20,6 +112,11 @@ func StartPoint(config *nconf.NatsConfig) (stan.SubscriptionOption, error) { return stan.StartAt(pb.StartPosition_NewOnly), nil case "", "first": return stan.StartAt(pb.StartPosition_First), nil + default: + dur, err := time.ParseDuration(v) + if err != nil { + return nil, errors.Wrap(err, "Failed to parse field as a duration") + } + return stan.StartAtTimeDelta(dur), nil } - return nil, fmt.Errorf("Unknown start position '%s', possible values are all, last, new, first and ''", config.StartPos) } diff --git a/messaging/nats.go b/messaging/nats.go index 18c8a2f..435a141 100644 --- a/messaging/nats.go +++ b/messaging/nats.go @@ -7,9 +7,8 @@ import ( "strings" "time" - "github.com/nats-io/go-nats" - stan "github.com/nats-io/go-nats-streaming" - "github.com/netlify/netlify-commons/nconf" + "github.com/nats-io/nats.go" + "github.com/nats-io/stan.go" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -22,7 +21,7 @@ func init() { silent = logrus.NewEntry(l) } -func ConfigureNatsConnection(config *nconf.NatsConfig, log logrus.FieldLogger) (*nats.Conn, error) { +func ConfigureNatsConnection(config *NatsConfig, log logrus.FieldLogger) (*nats.Conn, error) { if log == nil { log = silent } @@ -49,7 +48,7 @@ func ConfigureNatsConnection(config *nconf.NatsConfig, log logrus.FieldLogger) ( return nc, nil } -func ConnectToNats(config *nconf.NatsConfig, opts ...nats.Option) (*nats.Conn, error) { +func ConnectToNats(config *NatsConfig, opts ...nats.Option) (*nats.Conn, error) { tlsConfig, err := config.TLS.TLSConfig() if err != nil { return nil, errors.Wrap(err, "Failed to configure TLS") @@ -61,11 +60,11 @@ func ConnectToNats(config *nconf.NatsConfig, opts ...nats.Option) (*nats.Conn, e } switch strings.ToLower(config.Auth.Method) { - case nconf.NatsAuthMethodUser: + case NatsAuthMethodUser: opts = append(opts, nats.UserInfo(config.Auth.User, config.Auth.Password)) - case nconf.NatsAuthMethodToken: + case NatsAuthMethodToken: opts = append(opts, nats.Token(config.Auth.Token)) - case nconf.NatsAuthMethodTLS: + case NatsAuthMethodTLS: // if using TLS auth, make sure the client certificate is loaded if tlsConfig == nil || len(tlsConfig.Certificates) == 0 { return nil, fmt.Errorf("TLS auth method is configured but no certificate was loaded") @@ -78,7 +77,7 @@ func ConnectToNats(config *nconf.NatsConfig, opts ...nats.Option) (*nats.Conn, e return nats.Connect(config.ServerString(), opts...) } -func ConfigureNatsStreaming(config *nconf.NatsConfig, log logrus.FieldLogger) (stan.Conn, error) { +func ConfigureNatsStreaming(config *NatsConfig, log logrus.FieldLogger) (stan.Conn, error) { // connect to the underlying instance nc, err := ConfigureNatsConnection(config, log) if err != nil { @@ -93,7 +92,7 @@ func ConfigureNatsStreaming(config *nconf.NatsConfig, log logrus.FieldLogger) (s return conn, nil } -func ConnectToNatsStreaming(nc *nats.Conn, config *nconf.NatsConfig, log logrus.FieldLogger) (stan.Conn, error) { +func ConnectToNatsStreaming(nc *nats.Conn, config *NatsConfig, log logrus.FieldLogger) (stan.Conn, error) { if config.ClusterID == "" { return nil, errors.New("Must provide a cluster ID to connect to streaming nats") } diff --git a/nconf/nats.go b/nconf/nats.go deleted file mode 100644 index a2f7e59..0000000 --- a/nconf/nats.go +++ /dev/null @@ -1,81 +0,0 @@ -package nconf - -import ( - "fmt" - "strings" - - "github.com/netlify/netlify-commons/discovery" - "github.com/sirupsen/logrus" -) - -const ( - NatsAuthMethodUser = "user" - NatsAuthMethodToken = "token" - NatsAuthMethodTLS = "tls" -) - -type NatsAuth struct { - Method string `mapstructure:"method"` - User string `mapstructure:"user"` - Password string `mapstructure:"password"` - Token string `mapstructure:"token"` -} - -type NatsConfig struct { - TLS *TLSConfig `mapstructure:"tls_conf"` - DiscoveryName string `mapstructure:"discovery_name" split_words:"true"` - Servers []string `mapstructure:"servers"` - Auth NatsAuth `mapstructure:"auth"` - - // for streaming - ClusterID string `mapstructure:"cluster_id" split_words:"true"` - ClientID string `mapstructure:"client_id" split_words:"true"` - StartPos string `mapstructure:"start_pos" split_words:"true"` -} - -func (c *NatsConfig) LoadServerNames() error { - if c.DiscoveryName == "" { - return nil - } - - natsURLs := []string{} - endpoints, err := discovery.DiscoverEndpoints(c.DiscoveryName) - if err != nil { - return err - } - - for _, endpoint := range endpoints { - natsURLs = append(natsURLs, fmt.Sprintf("nats://%s:%d", endpoint.Target, endpoint.Port)) - } - - c.Servers = natsURLs - return nil -} - -// ServerString will build the proper string for nats connect -func (config *NatsConfig) ServerString() string { - return strings.Join(config.Servers, ",") -} - -func (config *NatsConfig) Fields() logrus.Fields { - f := logrus.Fields{ - "servers": strings.Join(config.Servers, ","), - } - - if config.Auth.Method != "" { - f["auth_method"] = config.Auth.Method - } - - if config.TLS != nil { - f["ca_files"] = strings.Join(config.TLS.CAFiles, ",") - f["key_file"] = config.TLS.KeyFile - f["cert_file"] = config.TLS.CertFile - } - - if config.ClusterID != "" { - f["client_id"] = config.ClientID - f["cluster_id"] = config.ClusterID - } - - return f -}