Skip to content

Commit 9b3dcf5

Browse files
committed
fix(wecom): replace dedupe map rotation with circular queue
The previous dedupe map rotation logic completely cleared the map when it reached max size, causing an 'amnesia cliff' where immediately arriving duplicates of just-forgotten messages would be processed. This change replaces that with a MessageDeduplicator struct that uses a circular queue (ring buffer) to track insertions. When the limit is reached, it only evicts the absolute oldest message from the map, completely resolving the cliff issue. This also cleans up the WeCom Bot and App webhook handlers by encapsulating the mutex and map state.
1 parent a2c4bf4 commit 9b3dcf5

File tree

4 files changed

+74
-50
lines changed

4 files changed

+74
-50
lines changed

pkg/channels/wecom.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"net/http"
2020
"sort"
2121
"strings"
22-
"sync"
2322
"time"
2423

2524
"github.com/sipeed/picoclaw/pkg/bus"
@@ -36,8 +35,7 @@ type WeComBotChannel struct {
3635
server *http.Server
3736
ctx context.Context
3837
cancel context.CancelFunc
39-
processedMsgs map[string]bool // Message deduplication: msg_id -> processed
40-
msgMu sync.RWMutex
38+
processedMsgs *MessageDeduplicator
4139
}
4240

4341
// WeComBotMessage represents the JSON message structure from WeCom Bot (AIBOT)
@@ -101,7 +99,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We
10199
return &WeComBotChannel{
102100
BaseChannel: base,
103101
config: cfg,
104-
processedMsgs: make(map[string]bool),
102+
processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages),
105103
}, nil
106104
}
107105

@@ -330,7 +328,7 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag
330328

331329
// Message deduplication: Use msg_id to prevent duplicate processing
332330
msgID := msg.MsgID
333-
if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) {
331+
if !c.processedMsgs.MarkMessageProcessed(msgID) {
334332
logger.DebugCF("wecom", "Skipping duplicate message", map[string]any{
335333
"msg_id": msgID,
336334
})

pkg/channels/wecom_app.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ type WeComAppChannel struct {
3737
tokenMu sync.RWMutex
3838
ctx context.Context
3939
cancel context.CancelFunc
40-
processedMsgs map[string]bool // Message deduplication: msg_id -> processed
41-
msgMu sync.RWMutex
40+
processedMsgs *MessageDeduplicator
4241
}
4342

4443
// WeComXMLMessage represents the XML message structure from WeCom
@@ -128,7 +127,7 @@ func NewWeComAppChannel(cfg config.WeComAppConfig, messageBus *bus.MessageBus) (
128127
return &WeComAppChannel{
129128
BaseChannel: base,
130129
config: cfg,
131-
processedMsgs: make(map[string]bool),
130+
processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages),
132131
}, nil
133132
}
134133

@@ -405,7 +404,7 @@ func (c *WeComAppChannel) processMessage(ctx context.Context, msg WeComXMLMessag
405404
// Message deduplication: Use msg_id to prevent duplicate processing
406405
// As per WeCom documentation, use msg_id for deduplication
407406
msgID := fmt.Sprintf("%d", msg.MsgId)
408-
if !markMessageProcessed(&c.msgMu, &c.processedMsgs, msgID, wecomMaxProcessedMessages) {
407+
if !c.processedMsgs.MarkMessageProcessed(msgID) {
409408
logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{
410409
"msg_id": msgID,
411410
})

pkg/channels/wecom_dedupe.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,51 @@ import "sync"
44

55
const wecomMaxProcessedMessages = 1000
66

7-
// markMessageProcessed marks msgID as processed and returns false for duplicates.
8-
// All map reads/writes (including len) are protected by msgMu to avoid races.
9-
func markMessageProcessed(msgMu *sync.RWMutex, processedMsgs *map[string]bool, msgID string, maxEntries int) bool {
7+
// MessageDeduplicator provides thread-safe message deduplication using a circular queue (ring buffer)
8+
// combined with a hash map. This ensures fast O(1) lookups while naturally evicting the oldest
9+
// messages without causing "amnesia cliffs" when the limit is reached.
10+
type MessageDeduplicator struct {
11+
mu sync.Mutex
12+
msgs map[string]bool
13+
ring []string
14+
idx int
15+
max int
16+
}
17+
18+
// NewMessageDeduplicator creates a new deduplicator with the specified capacity.
19+
func NewMessageDeduplicator(maxEntries int) *MessageDeduplicator {
1020
if maxEntries <= 0 {
1121
maxEntries = wecomMaxProcessedMessages
1222
}
13-
14-
msgMu.Lock()
15-
defer msgMu.Unlock()
16-
17-
if *processedMsgs == nil {
18-
*processedMsgs = make(map[string]bool)
23+
return &MessageDeduplicator{
24+
msgs: make(map[string]bool, maxEntries),
25+
ring: make([]string, maxEntries),
26+
max: maxEntries,
1927
}
28+
}
29+
30+
// MarkMessageProcessed marks msgID as processed and returns false for duplicates.
31+
func (d *MessageDeduplicator) MarkMessageProcessed(msgID string) bool {
32+
d.mu.Lock()
33+
defer d.mu.Unlock()
2034

21-
if (*processedMsgs)[msgID] {
35+
// 1. Check for duplicate
36+
if d.msgs[msgID] {
2237
return false
2338
}
24-
(*processedMsgs)[msgID] = true
2539

26-
// When over limit, reset dedupe map but keep the current message.
27-
if len(*processedMsgs) > maxEntries {
28-
*processedMsgs = map[string]bool{msgID: true}
40+
// 2. Evict the oldest message at our current ring position (if any)
41+
oldestID := d.ring[d.idx]
42+
if oldestID != "" {
43+
delete(d.msgs, oldestID)
2944
}
3045

46+
// 3. Store the new message
47+
d.msgs[msgID] = true
48+
d.ring[d.idx] = msgID
49+
50+
// 4. Advance the circle queue index
51+
d.idx = (d.idx + 1) % d.max
52+
3153
return true
3254
}

pkg/channels/wecom_dedupe_test.go

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,20 @@ import (
55
"testing"
66
)
77

8-
func TestMarkMessageProcessed_DuplicateDetection(t *testing.T) {
9-
var mu sync.RWMutex
10-
processed := make(map[string]bool)
8+
func TestMessageDeduplicator_DuplicateDetection(t *testing.T) {
9+
d := NewMessageDeduplicator(wecomMaxProcessedMessages)
1110

12-
if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); !ok {
11+
if ok := d.MarkMessageProcessed("msg-1"); !ok {
1312
t.Fatalf("first message should be accepted")
1413
}
1514

16-
if ok := markMessageProcessed(&mu, &processed, "msg-1", wecomMaxProcessedMessages); ok {
15+
if ok := d.MarkMessageProcessed("msg-1"); ok {
1716
t.Fatalf("duplicate message should be rejected")
1817
}
1918
}
2019

21-
func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) {
22-
var mu sync.RWMutex
23-
processed := make(map[string]bool)
20+
func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) {
21+
d := NewMessageDeduplicator(wecomMaxProcessedMessages)
2422

2523
const goroutines = 64
2624
var wg sync.WaitGroup
@@ -30,7 +28,7 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) {
3028
for i := 0; i < goroutines; i++ {
3129
go func() {
3230
defer wg.Done()
33-
results <- markMessageProcessed(&mu, &processed, "msg-concurrent", wecomMaxProcessedMessages)
31+
results <- d.MarkMessageProcessed("msg-concurrent")
3432
}()
3533
}
3634

@@ -49,30 +47,37 @@ func TestMarkMessageProcessed_ConcurrentSameMessage(t *testing.T) {
4947
}
5048
}
5149

52-
func TestMarkMessageProcessed_RotationClearsMapAtBoundary(t *testing.T) {
53-
var mu sync.RWMutex
54-
processed := make(map[string]bool)
50+
func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) {
51+
// Create a deduplicator with a very small capacity to test eviction easily
52+
capacity := 3
53+
d := NewMessageDeduplicator(capacity)
5554

56-
if ok := markMessageProcessed(&mu, &processed, "msg-1", 1); !ok {
57-
t.Fatalf("first message should be accepted")
58-
}
59-
if len(processed) != 1 {
60-
t.Fatalf("expected map size 1 after first insert, got %d", len(processed))
55+
// Fill the queue
56+
d.MarkMessageProcessed("msg-1")
57+
d.MarkMessageProcessed("msg-2")
58+
d.MarkMessageProcessed("msg-3")
59+
60+
// At this point, the queue is full. msg-1 is the oldest.
61+
if len(d.msgs) != 3 {
62+
t.Fatalf("expected map size to be 3, got %d", len(d.msgs))
6163
}
6264

63-
// Inserting second unique message exceeds maxEntries and should reset map, but keep the new message.
64-
if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); !ok {
65-
t.Fatalf("second unique message should be accepted")
65+
// This should evict msg-1 and add msg-4
66+
if ok := d.MarkMessageProcessed("msg-4"); !ok {
67+
t.Fatalf("msg-4 should be accepted")
6668
}
67-
if len(processed) != 1 {
68-
t.Fatalf("expected map to retain current message after rotation, got size %d", len(processed))
69+
70+
if len(d.msgs) != 3 {
71+
t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs))
6972
}
70-
if !processed["msg-2"] {
71-
t.Fatalf("expected current message marker to be retained after rotation")
73+
74+
// msg-1 should now be forgotten (evicted)
75+
if ok := d.MarkMessageProcessed("msg-1"); !ok {
76+
t.Fatalf("msg-1 should be accepted again because it was evicted")
7277
}
7378

74-
// Because msg-2 was retained, an immediate duplicate should be rejected.
75-
if ok := markMessageProcessed(&mu, &processed, "msg-2", 1); ok {
76-
t.Fatalf("duplicate message immediately after rotation should be rejected")
79+
// msg-2 should have been evicted when we added msg-1 back
80+
if ok := d.MarkMessageProcessed("msg-2"); !ok {
81+
t.Fatalf("msg-2 should be accepted again because it was evicted")
7782
}
7883
}

0 commit comments

Comments
 (0)