Skip to content

Commit e28a1db

Browse files
committed
Support for AutoAckOff
Signed-off-by: shivam <[email protected]>
1 parent 2d3b04d commit e28a1db

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

message.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type Message interface {
3636
MessageID() uint16
3737
Payload() []byte
3838
Ack()
39+
NoAutoAck() bool
40+
AutoAckOff()
3941
}
4042

4143
type message struct {
@@ -47,6 +49,7 @@ type message struct {
4749
payload []byte
4850
once sync.Once
4951
ack func()
52+
noautoack bool
5053
}
5154

5255
func (m *message) Duplicate() bool {
@@ -77,6 +80,14 @@ func (m *message) Ack() {
7780
m.once.Do(m.ack)
7881
}
7982

83+
func (m *message) NoAutoAck() bool {
84+
return m.noautoack
85+
}
86+
87+
func (m *message) AutoAckOff() {
88+
m.noautoack = true
89+
}
90+
8091
func messageFromPublish(p *packets.PublishPacket, ack func()) Message {
8192
return &message{
8293
duplicate: p.Dup,
@@ -86,6 +97,7 @@ func messageFromPublish(p *packets.PublishPacket, ack func()) Message {
8697
messageID: p.MessageID,
8798
payload: p.Payload,
8899
ack: ack,
100+
noautoack: false,
89101
}
90102
}
91103

router.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
186186
wg.Add(1)
187187
go func() {
188188
hd(client, m)
189-
m.Ack()
189+
if !m.NoAutoAck() {
190+
m.Ack()
191+
}
190192
wg.Done()
191193
}()
192194
}
@@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
201203
wg.Add(1)
202204
go func() {
203205
r.defaultHandler(client, m)
204-
m.Ack()
206+
if !m.NoAutoAck() {
207+
m.Ack()
208+
}
205209
wg.Done()
206210
}()
207211
}
@@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
212216
r.RUnlock()
213217
for _, handler := range handlers {
214218
handler(client, m)
215-
m.Ack()
219+
if !m.NoAutoAck() {
220+
m.Ack()
221+
}
216222
}
217223
// DEBUG.Println(ROU, "matchAndDispatch handled message")
218224
}

0 commit comments

Comments
 (0)