Skip to content

Commit b8217e1

Browse files
committed
Add Group Message Read Receipt Functionality
1 parent 3cf0e19 commit b8217e1

File tree

16 files changed

+481
-3
lines changed

16 files changed

+481
-3
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ require (
4040
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
4141
google.golang.org/grpc v1.68.0 // indirect
4242
)
43+
44+
replace github.com/openimsdk/protocol => github.com/rookiewwj/protocol v0.0.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
3030
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
3131
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
3232
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
33-
github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk=
34-
github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
3533
github.com/openimsdk/tools v0.0.50-alpha.80 h1:Nvt97Vm85CXr633Jf7WjRJeL2nxJJjwlZJFDgWWXkJU=
3634
github.com/openimsdk/tools v0.0.50-alpha.80/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
3735
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
@@ -41,6 +39,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
4139
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4240
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
4341
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
42+
github.com/rookiewwj/protocol v0.0.1 h1:Bd9F8FfE/viObERdhEuRm8tfJkE82dl244WvK1fNF2M=
43+
github.com/rookiewwj/protocol v0.0.1/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
4444
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
4545
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
4646
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

internal/conversation_msg/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,10 @@ func (c *Conversation) RevokeMessage(ctx context.Context, conversationID, client
781781
return c.revokeOneMessage(ctx, conversationID, clientMsgID)
782782
}
783783

784+
func (c *Conversation) ProjectGroupReadInfo(ctx context.Context, conversationID string, clientMsgIDs []string) error {
785+
return c.projectGroupReadInfo(ctx, conversationID, clientMsgIDs)
786+
}
787+
784788
func (c *Conversation) TypingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
785789
return c.typingStatusUpdate(ctx, recvID, msgTip)
786790
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package conversation_msg
2+
3+
import (
4+
"context"
5+
"sort"
6+
7+
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
8+
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
9+
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
10+
"github.com/openimsdk/tools/errs"
11+
"github.com/openimsdk/tools/log"
12+
)
13+
14+
// projectGroupReadInfo 根据本地存储的群聊已读游标,将指定消息投影计算其 GroupHasReadInfo(HasReadUserIDList/HasReadCount),并回写到消息的 AttachedInfo。
15+
// 入参:conversationID、需要投影的消息 clientMsgID 列表。
16+
// 作用:根据本地的"群聊已读游标表"(convid+uid -> maxReadSeq),对传入的消息做已读投影,
17+
// 回填到消息的 AttachedInfo.GroupHasReadInfo
18+
// 说明:
19+
// - 游标版本是"这批投影基于哪一版游标集计算出来的"标记,用于避免重复回填;
20+
// - 建议仅对"可视区"调用此函数。
21+
func (c *Conversation) projectGroupReadInfo(ctx context.Context, conversationID string, clientMsgIDs []string) error {
22+
23+
if len(clientMsgIDs) == 0 {
24+
return nil
25+
}
26+
27+
cursorState, err := c.db.GetGroupReadCursorState(ctx, conversationID)
28+
if err != nil {
29+
return errs.WrapMsg(err, "GetGroupReadCursorState failed", "conversationID", conversationID)
30+
}
31+
var cursorVersion int64
32+
if cursorState != nil {
33+
cursorVersion = cursorState.CursorVersion
34+
}
35+
36+
var messages []*model_struct.LocalChatLog
37+
for _, clientMsgID := range clientMsgIDs {
38+
message, err := c.waitForMessageSyncSeq(ctx, conversationID, clientMsgID)
39+
if err != nil {
40+
log.ZWarn(ctx, "waitForMessageSyncSeq failed", err, "conversationID", conversationID, "clientMsgID", clientMsgID)
41+
continue
42+
}
43+
messages = append(messages, message)
44+
}
45+
if len(messages) == 0 {
46+
return nil
47+
}
48+
49+
cursors, err := c.db.GetGroupReadCursorsByConversationID(ctx, conversationID) // []{UserID, MaxReadSeq}
50+
if err != nil {
51+
return errs.WrapMsg(err, "GetGroupReadCursorsByConversationID failed", "conversationID", conversationID)
52+
}
53+
memberCount := len(cursors)
54+
55+
sortedMaxSeqs := make([]int64, 0, memberCount)
56+
type pair struct {
57+
uid string
58+
seq int64
59+
}
60+
userSeqPairs := make([]pair, 0, memberCount)
61+
62+
for _, cur := range cursors {
63+
sortedMaxSeqs = append(sortedMaxSeqs, cur.MaxReadSeq)
64+
userSeqPairs = append(userSeqPairs, pair{uid: cur.UserID, seq: cur.MaxReadSeq})
65+
}
66+
sort.Slice(sortedMaxSeqs, func(i, j int) bool { return sortedMaxSeqs[i] < sortedMaxSeqs[j] })
67+
68+
for _, m := range messages {
69+
if m == nil || m.Seq == 0 {
70+
continue
71+
}
72+
var attach sdk_struct.AttachedInfoElem
73+
utils.JsonStringToStruct(m.AttachedInfo, &attach)
74+
75+
alreadyFresh := (attach.GroupHasReadInfo.ReadCursorVersion == cursorVersion)
76+
if alreadyFresh {
77+
continue
78+
}
79+
80+
idx := sort.Search(len(sortedMaxSeqs), func(i int) bool { return sortedMaxSeqs[i] >= m.Seq })
81+
hasReadCount := len(sortedMaxSeqs) - idx
82+
83+
attach.GroupHasReadInfo.HasReadCount = int32(hasReadCount)
84+
attach.GroupHasReadInfo.GroupMemberCount = int32(memberCount)
85+
attach.GroupHasReadInfo.ReadCursorVersion = cursorVersion
86+
87+
if hasReadCount > 0 {
88+
list := make([]string, 0, hasReadCount)
89+
for _, p := range userSeqPairs {
90+
if p.seq >= m.Seq {
91+
list = append(list, p.uid)
92+
}
93+
}
94+
attach.GroupHasReadInfo.HasReadUserIDList = list
95+
}
96+
97+
m.AttachedInfo = utils.StructToJsonString(attach)
98+
if err := c.db.UpdateMessage(ctx, conversationID, m); err != nil {
99+
log.ZWarn(ctx, "projectGroupReadInfo UpdateMessage err", err, "conversationID", conversationID, "seq", m.Seq, "clientMsgID", m.ClientMsgID)
100+
}
101+
}
102+
103+
return nil
104+
}

internal/conversation_msg/read_drawing.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
2727
"github.com/openimsdk/tools/errs"
2828
"github.com/openimsdk/tools/utils/datautil"
29+
"gorm.io/gorm"
2930

3031
"github.com/openimsdk/protocol/sdkws"
3132
"github.com/openimsdk/tools/log"
@@ -248,7 +249,8 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er
248249
return err
249250

250251
}
251-
if conversation.ConversationType == constant.SingleChatType {
252+
switch conversation.ConversationType {
253+
case constant.SingleChatType:
252254
latestMsg := &sdk_struct.MsgStruct{}
253255
if err := json.Unmarshal([]byte(conversation.LatestMsg), latestMsg); err != nil {
254256
log.ZWarn(ctx, "Unmarshal err", err, "conversationID", tips.ConversationID, "latestMsg", conversation.LatestMsg)
@@ -277,7 +279,48 @@ func (c *Conversation) doReadDrawing(ctx context.Context, msg *sdkws.MsgData) er
277279
var messageReceiptResp = []*sdk_struct.MessageReceipt{{UserID: tips.MarkAsReadUserID, MsgIDList: successMsgIDs,
278280
SessionType: conversation.ConversationType, ReadTime: msg.SendTime}}
279281
c.msgListener().OnRecvC2CReadReceipt(utils.StructToJsonString(messageReceiptResp))
282+
283+
case constant.ReadGroupChatType:
284+
285+
maxReadSeq := tips.HasReadSeq
286+
if maxReadSeq > 0 {
287+
cursor, err := c.db.GetGroupReadCursor(ctx, tips.ConversationID, tips.MarkAsReadUserID)
288+
if err != nil {
289+
if err == gorm.ErrRecordNotFound {
290+
// 如果不存在则创建新的游标
291+
newCursor := &model_struct.LocalGroupReadCursor{
292+
ConversationID: tips.ConversationID,
293+
UserID: tips.MarkAsReadUserID,
294+
MaxReadSeq: maxReadSeq,
295+
}
296+
if err = c.db.InsertGroupReadCursor(ctx, newCursor); err != nil {
297+
log.ZWarn(ctx, "InsertGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
298+
}
299+
if err = c.db.IncrementGroupReadCursorVersion(ctx, tips.ConversationID); err != nil {
300+
log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
301+
}
302+
} else {
303+
log.ZWarn(ctx, "GetGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
304+
return err
305+
}
306+
307+
} else {
308+
// 如果存在且新序号更大则更新
309+
if maxReadSeq > cursor.MaxReadSeq {
310+
if err = c.db.UpdateGroupReadCursor(ctx, tips.ConversationID, tips.MarkAsReadUserID, maxReadSeq); err != nil {
311+
log.ZWarn(ctx, "UpdateGroupReadCursor err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
312+
}
313+
if err = c.db.IncrementGroupReadCursorVersion(ctx, tips.ConversationID); err != nil {
314+
log.ZWarn(ctx, "IncrementGroupReadCursorVersion err", err, "conversationID", tips.ConversationID, "userID", tips.MarkAsReadUserID)
315+
}
316+
}
317+
}
318+
}
319+
var groupReceiptResp = []*sdk_struct.MessageReceipt{{GroupID: conversation.GroupID, UserID: tips.MarkAsReadUserID, MsgIDList: nil,
320+
SessionType: conversation.ConversationType, ReadTime: msg.SendTime}}
321+
c.msgListener().OnRecvGroupReadReceipt(utils.StructToJsonString(groupReceiptResp))
280322
}
323+
281324
} else {
282325
return c.doUnreadCount(ctx, conversation, tips.HasReadSeq, tips.Seqs)
283326
}

internal/conversation_msg/server_api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ func (c *Conversation) getIncrementalConversationFromServer(ctx context.Context,
7474
return api.GetIncrementalConversation.Invoke(ctx, req)
7575
}
7676

77+
func (c *Conversation) getConversationReadCursors(ctx context.Context, conversationIDs []string) (*pbConversation.GetConversationReadCursorsResp, error) {
78+
req := &pbConversation.GetConversationReadCursorsReq{ConversationIDs: conversationIDs}
79+
return api.GetConversationReadCursors.Invoke(ctx, req)
80+
}
81+
7782
func (c *Conversation) GetActiveConversations(ctx context.Context) ([]*jssdk.ConversationMsg, error) {
7883
conf, err := cliconf.GetClientConfig(ctx)
7984
if err != nil {

internal/conversation_msg/sync.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,100 @@ func (c *Conversation) SyncAllConversationHashReadSeqs(ctx context.Context) erro
131131
log.ZDebug(ctx, "TriggerCmdUpdateConversation completed", "duration", time.Since(stepStartTime).Seconds())
132132
}
133133

134+
stepStartTime = time.Now()
135+
if err := c.syncAllGroupReadCursors(ctx); err != nil {
136+
log.ZWarn(ctx, "syncAllGroupReadCursors failed", err)
137+
}
138+
log.ZDebug(ctx, "syncAllGroupReadCursors completed", "duration", time.Since(stepStartTime).Seconds())
139+
134140
log.ZDebug(ctx, "SyncAllConversationHashReadSeqs completed", "totalDuration", time.Since(startTime).Seconds())
135141
return nil
136142
}
143+
144+
func (c *Conversation) syncAllGroupReadCursors(ctx context.Context) error {
145+
// 1. 获取所有群聊会话ID
146+
conversations, err := c.db.GetAllConversations(ctx)
147+
if err != nil {
148+
log.ZWarn(ctx, "GetAllConversations failed", err)
149+
return err
150+
}
151+
152+
var groupConversationIDs []string
153+
for _, conv := range conversations {
154+
if conv.ConversationType == constant.ReadGroupChatType {
155+
groupConversationIDs = append(groupConversationIDs, conv.ConversationID)
156+
}
157+
}
158+
159+
if len(groupConversationIDs) == 0 {
160+
log.ZDebug(ctx, "no group conversations to sync cursors")
161+
return nil
162+
}
163+
164+
log.ZDebug(ctx, "found group conversations", "count", len(groupConversationIDs), "conversationIDs", groupConversationIDs)
165+
166+
// 2. 为每个群会话确保初始化 CursorState(如不存在则创建)
167+
for _, conversationID := range groupConversationIDs {
168+
if _, err := c.db.GetGroupReadCursorState(ctx, conversationID); err != nil {
169+
if ierr := c.db.InsertGroupReadCursorState(ctx, &model_struct.LocalGroupReadCursorState{ConversationID: conversationID, CursorVersion: 1}); ierr != nil {
170+
log.ZWarn(ctx, "InsertGroupReadCursorState failed", ierr, "conversationID", conversationID)
171+
} else {
172+
log.ZDebug(ctx, "initialized LocalGroupReadCursorState", "conversationID", conversationID)
173+
}
174+
}
175+
}
176+
177+
// 3. 调用服务器API获取已读游标
178+
stepStartTime := time.Now()
179+
resp, err := c.getConversationReadCursors(ctx, groupConversationIDs)
180+
if err != nil {
181+
log.ZWarn(ctx, "getConversationReadCursorsFromServer failed", err)
182+
return err
183+
}
184+
log.ZDebug(ctx, "getConversationReadCursorsFromServer completed", "duration", time.Since(stepStartTime).Seconds())
185+
186+
// 4. 处理响应并存储到本地数据库
187+
stepStartTime = time.Now()
188+
allCursorCount := 0
189+
for conversationID, cursorList := range resp.Cursors {
190+
curCursorCount := 0
191+
if cursorList == nil || len(cursorList.Cursors) == 0 {
192+
continue
193+
}
194+
195+
for _, cursor := range cursorList.Cursors {
196+
localCursor := &model_struct.LocalGroupReadCursor{
197+
ConversationID: conversationID,
198+
UserID: cursor.UserID,
199+
MaxReadSeq: cursor.MaxReadSeq,
200+
}
201+
202+
// 检查是否已存在,如果存在则更新,否则插入
203+
existingCursor, err := c.db.GetGroupReadCursor(ctx, conversationID, cursor.UserID)
204+
if err != nil {
205+
if err := c.db.InsertGroupReadCursor(ctx, localCursor); err != nil {
206+
log.ZWarn(ctx, "InsertGroupReadCursor failed", err, "conversationID", conversationID, "userID", cursor.UserID)
207+
} else {
208+
curCursorCount++
209+
}
210+
} else {
211+
if cursor.MaxReadSeq > existingCursor.MaxReadSeq {
212+
if err := c.db.UpdateGroupReadCursor(ctx, conversationID, cursor.UserID, cursor.MaxReadSeq); err != nil {
213+
log.ZWarn(ctx, "UpdateGroupReadCursor failed", err, "conversationID", conversationID, "userID", cursor.UserID)
214+
} else {
215+
curCursorCount++
216+
}
217+
}
218+
}
219+
}
220+
allCursorCount += curCursorCount
221+
if curCursorCount != 0 {
222+
if err := c.db.IncrementGroupReadCursorVersion(ctx, conversationID); err != nil {
223+
log.ZWarn(ctx, "IncrementGroupReadCursorVersion failed", err, "conversationID", conversationID)
224+
}
225+
}
226+
}
227+
228+
log.ZDebug(ctx, "syncAllGroupReadCursors completed", "duration", time.Since(stepStartTime).Seconds(), "cursorCount", allCursorCount)
229+
return nil
230+
}

0 commit comments

Comments
 (0)