-
Notifications
You must be signed in to change notification settings - Fork 3.7k
fix(wecom): remove message-dedupe data races and fix amnesia cliff #727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
18d8993
db17cdc
1e2ab4a
8640c81
29e9b6b
2e0be92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,7 +9,6 @@ import ( | |||||||||||||||||||||||||||
| "io" | ||||||||||||||||||||||||||||
| "net/http" | ||||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||||
| "sync" | ||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| "github.com/sipeed/picoclaw/pkg/bus" | ||||||||||||||||||||||||||||
|
|
@@ -28,8 +27,7 @@ type WeComBotChannel struct { | |||||||||||||||||||||||||||
| client *http.Client | ||||||||||||||||||||||||||||
| ctx context.Context | ||||||||||||||||||||||||||||
| cancel context.CancelFunc | ||||||||||||||||||||||||||||
| processedMsgs map[string]bool // Message deduplication: msg_id -> processed | ||||||||||||||||||||||||||||
| msgMu sync.RWMutex | ||||||||||||||||||||||||||||
| processedMsgs *MessageDeduplicator | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // WeComBotMessage represents the JSON message structure from WeCom Bot (AIBOT) | ||||||||||||||||||||||||||||
|
|
@@ -108,7 +106,7 @@ func NewWeComBotChannel(cfg config.WeComConfig, messageBus *bus.MessageBus) (*We | |||||||||||||||||||||||||||
| client: &http.Client{Timeout: clientTimeout}, | ||||||||||||||||||||||||||||
| ctx: ctx, | ||||||||||||||||||||||||||||
| cancel: cancel, | ||||||||||||||||||||||||||||
| processedMsgs: make(map[string]bool), | ||||||||||||||||||||||||||||
| processedMsgs: NewMessageDeduplicator(wecomMaxProcessedMessages), | ||||||||||||||||||||||||||||
| }, nil | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -330,23 +328,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Message deduplication: Use msg_id to prevent duplicate processing | ||||||||||||||||||||||||||||
| msgID := msg.MsgID | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| msgID := msg.MsgID | |
| msgID := msg.MsgID | |
| if msgID == "" { | |
| // Derive a fallback ID to avoid using an empty string as dedupe key | |
| fallbackID := fmt.Sprintf("fallback:%s:%s:%s", msg.From.UserID, msg.ChatID, msg.MsgType) | |
| logger.DebugCF("wecom", "Empty msg_id received, using fallback dedupe key", map[string]any{ | |
| "derived_msg_id": fallbackID, | |
| "from_user": msg.From.UserID, | |
| "chat_id": msg.ChatID, | |
| "msg_type": msg.MsgType, | |
| }) | |
| msgID = fallbackID | |
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,54 @@ | ||||||
| package wecom | ||||||
|
|
||||||
| import "sync" | ||||||
|
Comment on lines
+1
to
+3
|
||||||
|
|
||||||
| const wecomMaxProcessedMessages = 1000 | ||||||
|
|
||||||
| // MessageDeduplicator provides thread-safe message deduplication using a circular queue (ring buffer) | ||||||
| // combined with a hash map. This ensures fast O(1) lookups while naturally evicting the oldest | ||||||
| // messages without causing "amnesia cliffs" when the limit is reached. | ||||||
| type MessageDeduplicator struct { | ||||||
| mu sync.Mutex | ||||||
| msgs map[string]bool | ||||||
| ring []string | ||||||
| idx int | ||||||
| max int | ||||||
| } | ||||||
|
|
||||||
| // NewMessageDeduplicator creates a new deduplicator with the specified capacity. | ||||||
| func NewMessageDeduplicator(maxEntries int) *MessageDeduplicator { | ||||||
| if maxEntries <= 0 { | ||||||
| maxEntries = wecomMaxProcessedMessages | ||||||
| } | ||||||
| return &MessageDeduplicator{ | ||||||
| msgs: make(map[string]bool, maxEntries), | ||||||
| ring: make([]string, maxEntries), | ||||||
| max: maxEntries, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // MarkMessageProcessed marks msgID as processed and returns false for duplicates. | ||||||
| func (d *MessageDeduplicator) MarkMessageProcessed(msgID string) bool { | ||||||
| d.mu.Lock() | ||||||
| defer d.mu.Unlock() | ||||||
|
|
||||||
| // 1. Check for duplicate | ||||||
| if d.msgs[msgID] { | ||||||
| return false | ||||||
| } | ||||||
|
|
||||||
| // 2. Evict the oldest message at our current ring position (if any) | ||||||
| oldestID := d.ring[d.idx] | ||||||
| if oldestID != "" { | ||||||
| delete(d.msgs, oldestID) | ||||||
| } | ||||||
|
Comment on lines
+40
to
+44
|
||||||
|
|
||||||
| // 3. Store the new message | ||||||
| d.msgs[msgID] = true | ||||||
| d.ring[d.idx] = msgID | ||||||
|
|
||||||
| // 4. Advance the circle queue index | ||||||
|
||||||
| // 4. Advance the circle queue index | |
| // 4. Advance the circular queue index |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package wecom | ||
|
|
||
| import ( | ||
| "sync" | ||
| "testing" | ||
| ) | ||
|
|
||
| func TestMessageDeduplicator_DuplicateDetection(t *testing.T) { | ||
| d := NewMessageDeduplicator(wecomMaxProcessedMessages) | ||
|
|
||
| if ok := d.MarkMessageProcessed("msg-1"); !ok { | ||
| t.Fatalf("first message should be accepted") | ||
| } | ||
|
|
||
| if ok := d.MarkMessageProcessed("msg-1"); ok { | ||
| t.Fatalf("duplicate message should be rejected") | ||
| } | ||
| } | ||
|
|
||
| func TestMessageDeduplicator_ConcurrentSameMessage(t *testing.T) { | ||
| d := NewMessageDeduplicator(wecomMaxProcessedMessages) | ||
|
|
||
| const goroutines = 64 | ||
| var wg sync.WaitGroup | ||
| wg.Add(goroutines) | ||
|
|
||
| results := make(chan bool, goroutines) | ||
| for i := 0; i < goroutines; i++ { | ||
| go func() { | ||
| defer wg.Done() | ||
| results <- d.MarkMessageProcessed("msg-concurrent") | ||
| }() | ||
| } | ||
|
|
||
| wg.Wait() | ||
| close(results) | ||
|
|
||
| successes := 0 | ||
| for ok := range results { | ||
| if ok { | ||
| successes++ | ||
| } | ||
| } | ||
|
|
||
| if successes != 1 { | ||
| t.Fatalf("expected exactly 1 successful mark, got %d", successes) | ||
| } | ||
| } | ||
|
|
||
| func TestMessageDeduplicator_CircularQueueEviction(t *testing.T) { | ||
| // Create a deduplicator with a very small capacity to test eviction easily. | ||
| capacity := 3 | ||
| d := NewMessageDeduplicator(capacity) | ||
|
|
||
| // Fill the queue. | ||
| d.MarkMessageProcessed("msg-1") | ||
| d.MarkMessageProcessed("msg-2") | ||
| d.MarkMessageProcessed("msg-3") | ||
|
|
||
| // At this point, the queue is full. msg-1 is the oldest. | ||
| if len(d.msgs) != 3 { | ||
| t.Fatalf("expected map size to be 3, got %d", len(d.msgs)) | ||
| } | ||
|
|
||
| // This should evict msg-1 and add msg-4. | ||
| if ok := d.MarkMessageProcessed("msg-4"); !ok { | ||
| t.Fatalf("msg-4 should be accepted") | ||
| } | ||
|
|
||
| if len(d.msgs) != 3 { | ||
| t.Fatalf("expected map size to remain at max capacity (3), got %d", len(d.msgs)) | ||
| } | ||
|
|
||
| // msg-1 should now be forgotten (evicted). | ||
| if ok := d.MarkMessageProcessed("msg-1"); !ok { | ||
| t.Fatalf("msg-1 should be accepted again because it was evicted") | ||
| } | ||
|
|
||
| // msg-2 should have been evicted when we added msg-1 back. | ||
| if ok := d.MarkMessageProcessed("msg-2"); !ok { | ||
| t.Fatalf("msg-2 should be accepted again because it was evicted") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.MsgIdis stringified and used as the dedupe key without checking for the zero value. If the inbound XML ever omits<MsgId>, it will default to 0 and unrelated messages could be incorrectly treated as duplicates. Consider validatingmsg.MsgId != 0(or using a fallback key) before deduping.