Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.3.0
github.com/nats-io/go-nats-streaming v0.3.4
github.com/nats-io/go-nats v1.3.0 // indirect
github.com/nats-io/nats-server v1.4.1 // indirect
github.com/nats-io/nats-streaming-server v0.15.1 // indirect
github.com/nats-io/nats.go v1.8.1
github.com/nats-io/stan.go v0.5.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pelletier/go-toml v1.3.0 // indirect
github.com/philhofer/fwd v1.0.0 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ github.com/nats-io/gnatsd v1.4.1 h1:RconcfDeWpKCD6QIIwiVFcvForlXpWeJP7i5/lDLy44=
github.com/nats-io/gnatsd v1.4.1/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ=
github.com/nats-io/go-nats v1.3.0 h1:CrvnAwoB2A2Yma+PcM+5tC++3/wswhcy8OvzqbsUXZQ=
github.com/nats-io/go-nats v1.3.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/go-nats-streaming v0.3.4 h1:4z1stoQQfetddeodZ4huO24LK0QY75Mg4r9037J90MI=
github.com/nats-io/go-nats-streaming v0.3.4/go.mod h1:gfq4R3c9sKAINOpelo0gn/b9QDMBZnmrttcsNF+lqyo=
github.com/nats-io/jwt v0.2.6 h1:eAyoYvGgGLXR2EpnsBUvi/FcFrBqN6YKFVbOoEfPN4k=
github.com/nats-io/jwt v0.2.6/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY=
github.com/nats-io/nats-server v1.4.1 h1:Ul1oSOGNV/L8kjr4v6l2f9Yet6WY+LevH1/7cRZ/qyA=
Expand All @@ -101,6 +99,7 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.4.5 h1:lPZ9y1jVGiXcTaUc1SnEIWPYfh0avuEiHBePNJYgpPk=
github.com/nats-io/stan.go v0.4.5/go.mod h1:Ji7mK6gRZJSH1nc3ZJH6vi7zn/QnZhpR9Arm4iuzsUQ=
github.com/nats-io/stan.go v0.5.0/go.mod h1:dYqB+vMN3C2F9pT1FRQpg9eHbjPj6mP0yYuyBNuXHZE=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
Expand Down
25 changes: 0 additions & 25 deletions messaging/config.go

This file was deleted.

4 changes: 2 additions & 2 deletions messaging/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"strings"
"time"

"github.com/nats-io/go-nats"
stan "github.com/nats-io/go-nats-streaming"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"github.com/netlify/netlify-commons/nconf"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down
65 changes: 52 additions & 13 deletions nconf/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package nconf
import (
"fmt"
"strings"
"time"

"github.com/pkg/errors"

"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"

"github.com/netlify/netlify-commons/discovery"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -33,6 +39,20 @@ type NatsConfig struct {
StartPos string `mapstructure:"start_pos" split_words:"true"`
}

type NatsClientConfig struct {
NatsConfig
Subject string `mapstructure:"command_subject"`
Group string `mapstructure:"command_group"`

// StartAt will configure where the client should resume the stream:
// - `all`: all the messages available
// - `last`: from where the client left off
// - `new`: all new messages for the client
// - `first`: from the first message available (default)
// - other: if it isn't one of the above fields, it will try and parse the param as a go duration (e.g. 30s, 1h)
StartAt string `mapstructure:"start_at"`
}

func (c *NatsConfig) LoadServerNames() error {
if c.DiscoveryName == "" {
return nil
Expand All @@ -53,29 +73,48 @@ func (c *NatsConfig) LoadServerNames() error {
}

// ServerString will build the proper string for nats connect
func (config *NatsConfig) ServerString() string {
return strings.Join(config.Servers, ",")
func (c *NatsConfig) ServerString() string {
return strings.Join(c.Servers, ",")
}

func (config *NatsConfig) Fields() logrus.Fields {
func (c *NatsConfig) Fields() logrus.Fields {
f := logrus.Fields{
"servers": strings.Join(config.Servers, ","),
"servers": strings.Join(c.Servers, ","),
}

if config.Auth.Method != "" {
f["auth_method"] = config.Auth.Method
if c.Auth.Method != "" {
f["auth_method"] = c.Auth.Method
}

if config.TLS != nil {
f["ca_files"] = strings.Join(config.TLS.CAFiles, ",")
f["key_file"] = config.TLS.KeyFile
f["cert_file"] = config.TLS.CertFile
if c.TLS != nil {
f["ca_files"] = strings.Join(c.TLS.CAFiles, ",")
f["key_file"] = c.TLS.KeyFile
f["cert_file"] = c.TLS.CertFile
}

if config.ClusterID != "" {
f["client_id"] = config.ClientID
f["cluster_id"] = config.ClusterID
if c.ClusterID != "" {
f["client_id"] = c.ClientID
f["cluster_id"] = c.ClusterID
}

return f
}

func (c *NatsConfig) StartPoint() (stan.SubscriptionOption, error) {
switch v := strings.ToLower(c.StartPos); v {
case "all":
return stan.DeliverAllAvailable(), nil
case "last":
return stan.StartWithLastReceived(), nil
case "new":
return stan.StartAt(pb.StartPosition_NewOnly), nil
case "", "first":
return stan.StartAt(pb.StartPosition_First), nil
default:
dur, err := time.ParseDuration(v)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse field as a duration")
}
return stan.StartAtTimeDelta(dur), nil
}
}