@@ -3,15 +3,107 @@ package messaging
33import (
44 "fmt"
55 "strings"
6-
7- stan "github.com/nats-io/go-nats-streaming"
8- "github.com/nats-io/go-nats-streaming/pb"
6+ "time"
97
108 "github.com/netlify/netlify-commons/nconf"
9+
10+ "github.com/pkg/errors"
11+
12+ "github.com/nats-io/stan.go"
13+ "github.com/nats-io/stan.go/pb"
14+
15+ "github.com/netlify/netlify-commons/discovery"
16+ "github.com/sirupsen/logrus"
1117)
1218
13- func StartPoint (config * nconf.NatsConfig ) (stan.SubscriptionOption , error ) {
14- switch v := strings .ToLower (config .StartPos ); v {
19+ const (
20+ NatsAuthMethodUser = "user"
21+ NatsAuthMethodToken = "token"
22+ NatsAuthMethodTLS = "tls"
23+ )
24+
25+ type NatsAuth struct {
26+ Method string `mapstructure:"method"`
27+ User string `mapstructure:"user"`
28+ Password string `mapstructure:"password"`
29+ Token string `mapstructure:"token"`
30+ }
31+
32+ type NatsConfig struct {
33+ TLS * nconf.TLSConfig `mapstructure:"tls_conf"`
34+ DiscoveryName string `mapstructure:"discovery_name" split_words:"true"`
35+ Servers []string `mapstructure:"servers"`
36+ Auth NatsAuth `mapstructure:"auth"`
37+
38+ // for streaming
39+ ClusterID string `mapstructure:"cluster_id" split_words:"true"`
40+ ClientID string `mapstructure:"client_id" split_words:"true"`
41+ StartPos string `mapstructure:"start_pos" split_words:"true"`
42+ }
43+
44+ type NatsClientConfig struct {
45+ NatsConfig
46+ Subject string `mapstructure:"command_subject"`
47+ Group string `mapstructure:"command_group"`
48+
49+ // StartAt will configure where the client should resume the stream:
50+ // - `all`: all the messages available
51+ // - `last`: from where the client left off
52+ // - `new`: all new messages for the client
53+ // - `first`: from the first message available (default)
54+ // - other: if it isn't one of the above fields, it will try and parse the param as a go duration (e.g. 30s, 1h)
55+ StartAt string `mapstructure:"start_at"`
56+ }
57+
58+ func (c * NatsConfig ) LoadServerNames () error {
59+ if c .DiscoveryName == "" {
60+ return nil
61+ }
62+
63+ natsURLs := []string {}
64+ endpoints , err := discovery .DiscoverEndpoints (c .DiscoveryName )
65+ if err != nil {
66+ return err
67+ }
68+
69+ for _ , endpoint := range endpoints {
70+ natsURLs = append (natsURLs , fmt .Sprintf ("nats://%s:%d" , endpoint .Target , endpoint .Port ))
71+ }
72+
73+ c .Servers = natsURLs
74+ return nil
75+ }
76+
77+ // ServerString will build the proper string for nats connect
78+ func (c * NatsConfig ) ServerString () string {
79+ return strings .Join (c .Servers , "," )
80+ }
81+
82+ func (c * NatsConfig ) Fields () logrus.Fields {
83+ f := logrus.Fields {
84+ "servers" : strings .Join (c .Servers , "," ),
85+ }
86+
87+ if c .Auth .Method != "" {
88+ f ["auth_method" ] = c .Auth .Method
89+ }
90+
91+ if c .TLS != nil {
92+ f ["ca_files" ] = strings .Join (c .TLS .CAFiles , "," )
93+ f ["key_file" ] = c .TLS .KeyFile
94+ f ["cert_file" ] = c .TLS .CertFile
95+ }
96+
97+ if c .ClusterID != "" {
98+ f ["client_id" ] = c .ClientID
99+ f ["cluster_id" ] = c .ClusterID
100+ }
101+
102+ return f
103+ }
104+
105+ func (c * NatsConfig ) StartPoint () (stan.SubscriptionOption , error ) {
106+ switch v := strings .ToLower (c .StartPos ); v {
15107 case "all" :
16108 return stan .DeliverAllAvailable (), nil
17109 case "last" :
@@ -20,6 +112,11 @@ func StartPoint(config *nconf.NatsConfig) (stan.SubscriptionOption, error) {
20112 return stan .StartAt (pb .StartPosition_NewOnly ), nil
21113 case "" , "first" :
22114 return stan .StartAt (pb .StartPosition_First ), nil
115+ default :
116+ dur , err := time .ParseDuration (v )
117+ if err != nil {
118+ return nil , errors .Wrap (err , "Failed to parse field as a duration" )
119+ }
120+ return stan .StartAtTimeDelta (dur ), nil
23121 }
24- return nil , fmt .Errorf ("Unknown start position '%s', possible values are all, last, new, first and ''" , config .StartPos )
25122}
0 commit comments