File tree Expand file tree Collapse file tree 2 files changed +22
-13
lines changed Expand file tree Collapse file tree 2 files changed +22
-13
lines changed Original file line number Diff line number Diff line change @@ -15,16 +15,24 @@ type options struct {
1515 logger queue.Logger
1616 addr string
1717 subj string
18+ tag string
1819}
1920
20- // WithAddr setup the addr of NATS
21+ // WithAddr setup the URI
2122func WithAddr (addr string ) Option {
2223 return func (w * options ) {
2324 w .addr = "nats://" + addr
2425 }
2526}
2627
27- // WithSubj setup the subject of NATS
28+ // WithAddr setup the tag
29+ func WithTag (tag string ) Option {
30+ return func (w * options ) {
31+ w .tag = tag
32+ }
33+ }
34+
35+ // WithSubj setup the topic
2836func WithSubj (subj string ) Option {
2937 return func (w * options ) {
3038 w .subj = subj
@@ -49,6 +57,7 @@ func newOptions(opts ...Option) options {
4957 defaultOpts := options {
5058 addr : "amqp://guest:guest@localhost:5672/" ,
5159 subj : "queue" ,
60+ tag : "golang-queue" ,
5261 logger : queue .NewLogger (),
5362 runFunc : func (context.Context , core.QueuedMessage ) error {
5463 return nil
Original file line number Diff line number Diff line change @@ -17,7 +17,7 @@ var _ core.Worker = (*Worker)(nil)
1717
1818// Worker for NSQ
1919type Worker struct {
20- client * amqp.Connection
20+ conn * amqp.Connection
2121 channel * amqp.Channel
2222 stop chan struct {}
2323 stopFlag int32
@@ -36,12 +36,12 @@ func NewWorker(opts ...Option) *Worker {
3636 tasks : make (chan amqp.Delivery ),
3737 }
3838
39- w .client , err = amqp .Dial (w .opts .addr )
39+ w .conn , err = amqp .Dial (w .opts .addr )
4040 if err != nil {
4141 panic (err )
4242 }
4343
44- w .channel , err = w .client .Channel ()
44+ w .channel , err = w .conn .Channel ()
4545 if err != nil {
4646 panic (err )
4747 }
@@ -66,13 +66,13 @@ func (w *Worker) startConsumer() (err error) {
6666 }
6767
6868 w .tasks , err = w .channel .Consume (
69- q .Name , // queue
70- "" , // consumer
71- true , // auto-ack
72- false , // exclusive
73- false , // no-local
74- false , // no-wait
75- nil , // args
69+ q .Name , // queue
70+ w . opts . tag , // consumer
71+ true , // auto-ack
72+ false , // exclusive
73+ false , // no-local
74+ false , // no-wait
75+ nil , // args
7676 )
7777
7878 if err != nil {
@@ -152,7 +152,7 @@ func (w *Worker) Shutdown() error {
152152 if err := w .channel .Cancel ("" , true ); err != nil {
153153 w .opts .logger .Error (err )
154154 }
155- if err := w .client .Close (); err != nil {
155+ if err := w .conn .Close (); err != nil {
156156 w .opts .logger .Error (err )
157157 }
158158 })
You can’t perform that action at this time.
0 commit comments