fix(wecom): remove message-dedupe data races and fix amnesia cliff#727
fix(wecom): remove message-dedupe data races and fix amnesia cliff#727yinwm merged 6 commits intosipeed:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses concurrency safety in the WeCom (bot + app) message de-duplication flow by centralizing the “mark processed” logic behind a single mutex-protected helper.
Changes:
- Introduces a shared
markMessageProcessed(...)helper to guard all map accesses (includinglen) under a lock. - Updates both WeCom bot and WeCom app channels to use the shared helper instead of inlined map+mutex logic.
- Updates
make vet/make testto rungeneratefirst.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| pkg/channels/wecom_dedupe.go | New shared helper for thread-safe message de-dupe and bounded map growth. |
| pkg/channels/wecom.go | Switches bot channel dedupe logic to the shared helper. |
| pkg/channels/wecom_app.go | Switches app channel dedupe logic to the shared helper. |
| Makefile | Makes vet and test depend on generate. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
72dd4ee to
e083df5
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Copilot reviews are nice so i will keep addressing them |
77df2d5 to
75c0bb1
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Circle Queue may a better way. |
leme try |
PixelTux
left a comment
There was a problem hiding this comment.
Looks like a clean fix, and the circular queue is a nice touch. LGTM
nikolasdehor
left a comment
There was a problem hiding this comment.
Clean fix for both the data race and the amnesia cliff. The ring buffer approach is a textbook solution for bounded-size deduplication with FIFO eviction -- simple, correct, and O(1) for all operations.
The implementation is tight:
- Thread safety via a single
sync.Mutex(correct --RWMutexwas overkill since the read and write are always paired inMarkMessageProcessed) - Pre-allocated ring buffer avoids allocations during message processing
- The
maxEntries <= 0fallback to the default constant is a nice defensive touch - Test coverage includes duplicate detection, concurrent access (64 goroutines), and circular eviction
One minor observation: the MessageDeduplicator is in a general channels package with a wecomMaxProcessedMessages constant. If other channels (Telegram, Discord) need deduplication in the future, this would be reusable as-is -- the only WeCom-specific part is the constant name and its location. Consider renaming to maxProcessedMessages if you want to signal it is a general-purpose utility.
LGTM.
|
Please develop based on the refactor branch. The refactor branch is about to be merged, and PRs developed based on main before the merge will not be reviewed. |
|
The refactor branch is scheduled to be merged during the daytime of February 28, 2026, Beijing Time (GMT+8), with the exact time yet to be determined. You may wait until after the merge to request a PR review. We have provided comprehensive migration documentation for the new channel system. |
|
thanks for the pr, @Esubaalew please resolve conflicts |
yinwm
left a comment
There was a problem hiding this comment.
Review Summary
High-quality bug fix PR ✅
Issues Fixed
- Data Race: Original code
len(c.processedMsgs) > 1000accessed map outside lock. Now all operations are encapsulated inMessageDeduplicator - Amnesia Cliff: Ring buffer evicts oldest message one at a time, avoiding the previous issue where clearing the entire map could let duplicates through
Strengths
- Thread-safe implementation
- Good code reuse between bot and app channels
- Solid test coverage (duplicate detection, concurrency safety, circular eviction)
Minor Suggestions (non-blocking)
- Tests access private field
d.msgsdirectly - consider exposingSize() intmethod or using pure behavioral tests - This doesn't affect correctness though
LGTM! 🚀
Centralize dedupe map access behind a mutex-safe helper and use it in both WeCom bot and WeCom app channels to eliminate concurrent map access races while preserving current dedupe behavior.
Match rotation semantics to prior behavior by fully resetting the dedupe map once the size limit is exceeded, and add focused tests for duplicate detection and boundary rotation behavior.
Use wecomMaxProcessedMessages in tests and add a concurrent same-message test to lock in race-safety behavior for markMessageProcessed.
When the dedupe map rotates, the previous logic entirely cleared the map, meaning the message that triggered the rotation was immediately forgotten and could be duplicated immediately. This change seeds the new map with the current message to prevent that. Also adds a defensive nil check.
The previous dedupe map rotation logic completely cleared the map when it reached max size, causing an 'amnesia cliff' where immediately arriving duplicates of just-forgotten messages would be processed. This change replaces that with a MessageDeduplicator struct that uses a circular queue (ring buffer) to track insertions. When the limit is reached, it only evicts the absolute oldest message from the map, completely resolving the cliff issue. This also cleans up the WeCom Bot and App webhook handlers by encapsulating the mutex and map state.
Rebase onto latest upstream/main, keep ring-buffer dedupe behavior, move dedupe tests to pkg/channels/wecom, and ensure wecom/channels race tests pass.
9b3dcf5 to
2e0be92
Compare
done |
|
thanks for the pr |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // 2. Evict the oldest message at our current ring position (if any) | ||
| oldestID := d.ring[d.idx] | ||
| if oldestID != "" { | ||
| delete(d.msgs, oldestID) | ||
| } |
There was a problem hiding this comment.
The ring buffer uses an empty string sentinel ("" check) to decide whether to delete an evicted entry. If a real msgID can ever be "", eviction won't delete it and the dedupe set can exceed the configured max and behave incorrectly. Consider rejecting empty msgIDs up front or tracking slot occupancy separately (e.g., size counter / filled flag) so "" can be a valid key.
| d.msgs[msgID] = true | ||
| d.ring[d.idx] = msgID | ||
|
|
||
| // 4. Advance the circle queue index |
There was a problem hiding this comment.
Typo/wording in comment: "circle queue" should be "circular queue" or "ring buffer" for clarity/consistency with the rest of the file.
| // 4. Advance the circle queue index | |
| // 4. Advance the circular queue index |
| package wecom | ||
|
|
||
| import "sync" |
There was a problem hiding this comment.
The PR description mentions files like pkg/channels/wecom_dedupe.go and pkg/channels/wecom.go, but the actual changes add pkg/channels/wecom/dedupe.go and update pkg/channels/wecom/{bot.go,app.go}. Please update the description to match the real file paths so reviewers can follow along more easily.
| @@ -330,23 +328,12 @@ func (c *WeComBotChannel) processMessage(ctx context.Context, msg WeComBotMessag | |||
|
|
|||
| // Message deduplication: Use msg_id to prevent duplicate processing | |||
| msgID := msg.MsgID | |||
There was a problem hiding this comment.
msg.MsgID is used as the dedupe key without validation. If the inbound JSON is missing msgid (or it is an empty string), all such messages will collapse to the same key and be incorrectly dropped as duplicates; with the new ring implementation, an empty string key also interferes with eviction. Consider explicitly rejecting/handling empty msg IDs before calling the deduplicator (e.g., log + skip, or derive a fallback ID).
| msgID := msg.MsgID | |
| msgID := msg.MsgID | |
| if msgID == "" { | |
| // Derive a fallback ID to avoid using an empty string as dedupe key | |
| fallbackID := fmt.Sprintf("fallback:%s:%s:%s", msg.From.UserID, msg.ChatID, msg.MsgType) | |
| logger.DebugCF("wecom", "Empty msg_id received, using fallback dedupe key", map[string]any{ | |
| "derived_msg_id": fallbackID, | |
| "from_user": msg.From.UserID, | |
| "chat_id": msg.ChatID, | |
| "msg_type": msg.MsgType, | |
| }) | |
| msgID = fallbackID | |
| } |
| // Message deduplication: Use msg_id to prevent duplicate processing | ||
| // As per WeCom documentation, use msg_id for deduplication | ||
| msgID := fmt.Sprintf("%d", msg.MsgId) | ||
| c.msgMu.Lock() | ||
| if c.processedMsgs[msgID] { | ||
| c.msgMu.Unlock() | ||
| if !c.processedMsgs.MarkMessageProcessed(msgID) { | ||
| logger.DebugCF("wecom_app", "Skipping duplicate message", map[string]any{ | ||
| "msg_id": msgID, | ||
| }) |
There was a problem hiding this comment.
msg.MsgId is stringified and used as the dedupe key without checking for the zero value. If the inbound XML ever omits <MsgId>, it will default to 0 and unrelated messages could be incorrectly treated as duplicates. Consider validating msg.MsgId != 0 (or using a fallback key) before deduping.
fix(wecom): remove message-dedupe data races and fix amnesia cliff
fix(wecom): remove message-dedupe data races and fix amnesia cliff
Summary
MessageDeduplicatorhelper used by both bot and app channels.Root Cause
processedMsgsmap was accessed outside a lock (len(map)check) while other goroutines were writing, causing race detector failures.Changes
pkg/channels/wecom_dedupe.gowith a thread-safeMessageDeduplicatorstruct.map[string]bool(for O(1) lookups) with a ring buffer[]string(to track the absolute oldest message). When the limit is reached, only the oldest message is evicted.pkg/channels/wecom.goandpkg/channels/wecom_app.goto use the shared helper, cleanly encapsulating theirRWMutexandmapstate.Validation
go test -race ./pkg/channelsgo test -race ./...