@@ -3,6 +3,12 @@ package nconf
33import (
44 "fmt"
55 "strings"
6+ "time"
7+
8+ "github.com/pkg/errors"
9+
10+ "github.com/nats-io/stan.go"
11+ "github.com/nats-io/stan.go/pb"
612
713 "github.com/netlify/netlify-commons/discovery"
814 "github.com/sirupsen/logrus"
@@ -33,6 +39,20 @@ type NatsConfig struct {
3339 StartPos string `mapstructure:"start_pos" split_words:"true"`
3440}
3541
42+ type NatsClientConfig struct {
43+ NatsConfig
44+ Subject string `mapstructure:"command_subject"`
45+ Group string `mapstructure:"command_group"`
46+
47+ // StartAt will configure where the client should resume the stream:
48+ // - `all`: all the messages available
49+ // - `last`: from where the client left off
50+ // - `new`: all new messages for the client
51+ // - `first`: from the first message available (default)
52+ // - other: if it isn't one of the above fields, it will try and parse the param as a go duration (e.g. 30s, 1h)
53+ StartAt string `mapstructure:"start_at"`
54+ }
55+
3656func (c * NatsConfig ) LoadServerNames () error {
3757 if c .DiscoveryName == "" {
3858 return nil
@@ -53,29 +73,48 @@ func (c *NatsConfig) LoadServerNames() error {
5373}
5474
5575// ServerString will build the proper string for nats connect
56- func (config * NatsConfig ) ServerString () string {
57- return strings .Join (config .Servers , "," )
76+ func (c * NatsConfig ) ServerString () string {
77+ return strings .Join (c .Servers , "," )
5878}
5979
60- func (config * NatsConfig ) Fields () logrus.Fields {
80+ func (c * NatsConfig ) Fields () logrus.Fields {
6181 f := logrus.Fields {
62- "servers" : strings .Join (config .Servers , "," ),
82+ "servers" : strings .Join (c .Servers , "," ),
6383 }
6484
65- if config .Auth .Method != "" {
66- f ["auth_method" ] = config .Auth .Method
85+ if c .Auth .Method != "" {
86+ f ["auth_method" ] = c .Auth .Method
6787 }
6888
69- if config .TLS != nil {
70- f ["ca_files" ] = strings .Join (config .TLS .CAFiles , "," )
71- f ["key_file" ] = config .TLS .KeyFile
72- f ["cert_file" ] = config .TLS .CertFile
89+ if c .TLS != nil {
90+ f ["ca_files" ] = strings .Join (c .TLS .CAFiles , "," )
91+ f ["key_file" ] = c .TLS .KeyFile
92+ f ["cert_file" ] = c .TLS .CertFile
7393 }
7494
75- if config .ClusterID != "" {
76- f ["client_id" ] = config .ClientID
77- f ["cluster_id" ] = config .ClusterID
95+ if c .ClusterID != "" {
96+ f ["client_id" ] = c .ClientID
97+ f ["cluster_id" ] = c .ClusterID
7898 }
7999
80100 return f
81101}
102+
103+ func (c * NatsConfig ) StartPoint () (stan.SubscriptionOption , error ) {
104+ switch v := strings .ToLower (c .StartPos ); v {
105+ case "all" :
106+ return stan .DeliverAllAvailable (), nil
107+ case "last" :
108+ return stan .StartWithLastReceived (), nil
109+ case "new" :
110+ return stan .StartAt (pb .StartPosition_NewOnly ), nil
111+ case "" , "first" :
112+ return stan .StartAt (pb .StartPosition_First ), nil
113+ default :
114+ dur , err := time .ParseDuration (v )
115+ if err != nil {
116+ return nil , errors .Wrap (err , "Failed to parse field as a duration" )
117+ }
118+ return stan .StartAtTimeDelta (dur ), nil
119+ }
120+ }
0 commit comments