From 6be5ca6dd69049eb282059dbc8c655c3edb12acc Mon Sep 17 00:00:00 2001 From: Arvindh Date: Mon, 23 Jun 2025 23:37:47 +0530 Subject: [PATCH 1/3] try to disconnect on failed authz publish Signed-off-by: Arvindh --- pkg/session/stream.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/session/stream.go b/pkg/session/stream.go index a3b93259..2f0e93da 100644 --- a/pkg/session/stream.go +++ b/pkg/session/stream.go @@ -69,6 +69,29 @@ func stream(ctx context.Context, dir Direction, r, w net.Conn, h Handler, preIc, switch dir { case Up: if err = authorize(ctx, pkt, h); err != nil { + if pub, ok := pkt.(*packets.PublishPacket); ok { + switch pub.Qos { + case 0: + pkt = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) + if wErr := pkt.Write(w); wErr != nil { + err = errors.Join(err, wErr) + } + case 1: + puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) + puback.MessageID = pub.MessageID + if wErr := puback.Write(w); wErr != nil { + err = errors.Join(err, wErr) + } + + case 2: + pubrec := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket) + pubrec.MessageID = pub.MessageID + + if wErr := pubrec.Write(w); wErr != nil { + err = errors.Join(err, wErr) + } + } + } errs <- wrap(ctx, err, dir) return } From 9b621bdba0974b8c94ef81e4c03bac24a17a9e3b Mon Sep 17 00:00:00 2001 From: Arvindh Date: Tue, 12 Aug 2025 17:06:25 +0530 Subject: [PATCH 2/3] update logic to disconnect for authz failed publish Signed-off-by: Arvindh --- pkg/session/stream.go | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/pkg/session/stream.go b/pkg/session/stream.go index 2f0e93da..4c3bc6ba 100644 --- a/pkg/session/stream.go +++ b/pkg/session/stream.go @@ -69,27 +69,10 @@ func stream(ctx context.Context, dir Direction, r, w net.Conn, h Handler, preIc, switch dir { case Up: if err = authorize(ctx, pkt, h); err != nil { - if pub, ok := pkt.(*packets.PublishPacket); ok { - switch pub.Qos { - case 0: - pkt = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) - if wErr := pkt.Write(w); wErr != nil { - err = errors.Join(err, wErr) - } - case 1: - puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) - puback.MessageID = pub.MessageID - if wErr := puback.Write(w); wErr != nil { - err = errors.Join(err, wErr) - } - - case 2: - pubrec := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket) - pubrec.MessageID = pub.MessageID - - if wErr := pubrec.Write(w); wErr != nil { - err = errors.Join(err, wErr) - } + if _, ok := pkt.(*packets.PublishPacket); ok { + pkt = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) + if wErr := pkt.Write(w); wErr != nil { + err = errors.Join(err, wErr) } } errs <- wrap(ctx, err, dir) From 4b26856f56e53674c098f01e9a6813ad4a8e7fcc Mon Sep 17 00:00:00 2001 From: Arvindh Date: Tue, 19 Aug 2025 17:29:31 +0530 Subject: [PATCH 3/3] update disconnect control packet Signed-off-by: Arvindh --- pkg/session/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/session/stream.go b/pkg/session/stream.go index 4c3bc6ba..2cc79284 100644 --- a/pkg/session/stream.go +++ b/pkg/session/stream.go @@ -70,7 +70,7 @@ func stream(ctx context.Context, dir Direction, r, w net.Conn, h Handler, preIc, case Up: if err = authorize(ctx, pkt, h); err != nil { if _, ok := pkt.(*packets.PublishPacket); ok { - pkt = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) + pkt = packets.NewControlPacket(packets.Disconnect) if wErr := pkt.Write(w); wErr != nil { err = errors.Join(err, wErr) }