From 41116c2495f10e1ef9842376aadfe2e20c2938a4 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Fri, 28 Nov 2025 14:15:40 +0800 Subject: [PATCH 01/12] Fix NPE. Change-Id: I340cdbf0ca076598501820c11e5dea0a9e78ff84 --- .../domain/task/service/taskexe/tracehub/backfill.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index 26d60fa75..de7e4c60b 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -174,6 +174,11 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub totalCount += int64(len(spans)) logs.CtxInfo(ctx, "Processed %d spans completed, total=%d, task_id=%d", len(spans), totalCount, sub.t.ID) + listParam.PageToken = pageToken + if sub.tr.BackfillDetail == nil { + sub.tr.BackfillDetail = &entity.BackfillDetail{} + } + sub.tr.BackfillDetail.LastSpanPageToken = &pageToken // todo 不应该这里直接写po字段 err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), @@ -194,8 +199,6 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub } return nil } - listParam.PageToken = pageToken - sub.tr.BackfillDetail.LastSpanPageToken = &pageToken } } From 77d116b77ac7aa92245e1d058d4fa7797234ffb6 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Fri, 28 Nov 2025 14:46:27 +0800 Subject: [PATCH 02/12] Fix task finish bug. Change-Id: Ie20ca0f34b6f026cc9bc243e56cde80f35e70d5c --- .../domain/task/service/taskexe/tracehub/backfill.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index de7e4c60b..a775caa71 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -193,7 +193,7 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub if err = sub.processor.OnTaskFinished(ctx, taskexe.OnTaskFinishedReq{ Task: sub.t, TaskRun: sub.tr, - IsFinish: false, + IsFinish: false, // 任务可能同时有历史回溯和新任务,不能直接关闭 }); err != nil { return err } From 4282ef70c172ed4a0e63a74bb057d28d58eb76d7 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Fri, 28 Nov 2025 15:47:54 +0800 Subject: [PATCH 03/12] Fix no backfill detail bug. Change-Id: I8acb6e8902f134672872ab804a4010900a21c0ca --- .../application/convertor/task/task.go | 20 +++++++++---------- .../observability/domain/task/entity/task.go | 10 +++++----- .../task/service/taskexe/tracehub/backfill.go | 6 +++--- .../service/taskexe/tracehub/backfill_test.go | 4 ++-- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/backend/modules/observability/application/convertor/task/task.go b/backend/modules/observability/application/convertor/task/task.go index c969881b3..f33dea056 100644 --- a/backend/modules/observability/application/convertor/task/task.go +++ b/backend/modules/observability/application/convertor/task/task.go @@ -243,11 +243,11 @@ func BackfillRunDetailDO2DTO(backfillDetail *entity.BackfillDetail) *task.Backfi return nil } return &task.BackfillDetail{ - SuccessCount: backfillDetail.SuccessCount, - FailedCount: backfillDetail.FailedCount, - TotalCount: backfillDetail.TotalCount, - BackfillStatus: backfillDetail.BackfillStatus, - LastSpanPageToken: backfillDetail.LastSpanPageToken, + SuccessCount: &backfillDetail.SuccessCount, + FailedCount: &backfillDetail.FailedCount, + TotalCount: &backfillDetail.TotalCount, + BackfillStatus: &backfillDetail.BackfillStatus, + LastSpanPageToken: &backfillDetail.LastSpanPageToken, } } @@ -520,11 +520,11 @@ func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail { return nil } return &entity.BackfillDetail{ - SuccessCount: v.SuccessCount, - FailedCount: v.FailedCount, - TotalCount: v.TotalCount, - BackfillStatus: v.BackfillStatus, - LastSpanPageToken: v.LastSpanPageToken, + SuccessCount: v.GetSuccessCount(), + FailedCount: v.GetFailedCount(), + TotalCount: v.GetTotalCount(), + BackfillStatus: v.GetBackfillStatus(), + LastSpanPageToken: v.GetLastSpanPageToken(), } } diff --git a/backend/modules/observability/domain/task/entity/task.go b/backend/modules/observability/domain/task/entity/task.go index f6e27a841..15989350b 100644 --- a/backend/modules/observability/domain/task/entity/task.go +++ b/backend/modules/observability/domain/task/entity/task.go @@ -145,11 +145,11 @@ type TaskRun struct { UpdatedAt time.Time // 更新时间 } type BackfillDetail struct { - SuccessCount *int64 `json:"success_count"` - FailedCount *int64 `json:"failed_count"` - TotalCount *int64 `json:"total_count"` - BackfillStatus *string `json:"backfill_status"` - LastSpanPageToken *string `json:"last_span_page_token"` + SuccessCount int64 `json:"success_count,omitempty"` + FailedCount int64 `json:"failed_count,omitempty"` + TotalCount int64 `json:"total_count,omitempty"` + BackfillStatus string `json:"backfill_status,omitempty"` + LastSpanPageToken string `json:"last_span_page_token,omitempty"` } type TaskRunConfig struct { diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index a775caa71..26e7267ed 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -153,8 +153,8 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub NotQueryAnnotation: true, // No annotation query required during backfill } - if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != nil { - listParam.PageToken = *sub.tr.BackfillDetail.LastSpanPageToken + if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != "" { + listParam.PageToken = sub.tr.BackfillDetail.LastSpanPageToken } totalCount := int64(0) @@ -178,7 +178,7 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub if sub.tr.BackfillDetail == nil { sub.tr.BackfillDetail = &entity.BackfillDetail{} } - sub.tr.BackfillDetail.LastSpanPageToken = &pageToken + sub.tr.BackfillDetail.LastSpanPageToken = pageToken // todo 不应该这里直接写po字段 err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go index fc2767e71..1e3ec7a9b 100755 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go @@ -320,7 +320,7 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) { now := time.Now() sub, proc := newBackfillSubscriber(mockTaskRepo, now) - sub.tr.BackfillDetail = &entity.BackfillDetail{LastSpanPageToken: ptr.Of("prev")} + sub.tr.BackfillDetail = &entity.BackfillDetail{LastSpanPageToken: "prev"} domainRun := newDomainBackfillTaskRun(now) span := newTestSpan(now) @@ -350,7 +350,7 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) { require.True(t, proc.invokeCalled) require.NotNil(t, sub.tr.BackfillDetail) require.NotNil(t, sub.tr.BackfillDetail.LastSpanPageToken) - require.Equal(t, "prev", ptr.From(sub.tr.BackfillDetail.LastSpanPageToken)) + require.Equal(t, "prev", sub.tr.BackfillDetail.LastSpanPageToken) } func TestTraceHubServiceImpl_ListAndSendSpans_ListError(t *testing.T) { From 8377cf6ef8158b331a8a7f685c805101cd1ea963 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Fri, 28 Nov 2025 17:51:41 +0800 Subject: [PATCH 04/12] Fix token empty bug. Change-Id: I2eb2a18e54d8a1e683b9bf25b984aaa7c17150bf --- .../task/service/taskexe/tracehub/backfill.go | 13 +++++++------ .../task/service/taskexe/tracehub/span_trigger.go | 1 - .../task/service/taskexe/tracehub/subscriber.go | 2 +- .../modules/observability/infra/repo/redis/task.go | 3 --- .../observability/infra/repo/redis/task_run.go | 1 - 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index 26e7267ed..76d59b36c 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -156,6 +156,9 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != "" { listParam.PageToken = sub.tr.BackfillDetail.LastSpanPageToken } + if sub.tr.BackfillDetail == nil { + sub.tr.BackfillDetail = &entity.BackfillDetail{} + } totalCount := int64(0) for { @@ -174,15 +177,15 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub totalCount += int64(len(spans)) logs.CtxInfo(ctx, "Processed %d spans completed, total=%d, task_id=%d", len(spans), totalCount, sub.t.ID) - listParam.PageToken = pageToken - if sub.tr.BackfillDetail == nil { - sub.tr.BackfillDetail = &entity.BackfillDetail{} + if pageToken != "" { + listParam.PageToken = pageToken + sub.tr.BackfillDetail.LastSpanPageToken = pageToken } - sub.tr.BackfillDetail.LastSpanPageToken = pageToken // todo 不应该这里直接写po字段 err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), }) + if err != nil { logs.CtxError(ctx, "update task run failed, task_id=%d, err=%v", sub.t.ID, err) return err @@ -427,8 +430,6 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans func (h *TraceHubServiceImpl) processBatchSpans(ctx context.Context, spans []*loop_span.Span, sub *spanSubscriber) (err error, shouldFinish bool) { for _, span := range spans { // Execute processing logic according to the task type - logs.CtxInfo(ctx, "processing span for backfill, span_id=%s, trace_id=%s, task_id=%d", - span.SpanID, span.TraceID, sub.t.ID) taskCount, _ := h.taskRepo.GetTaskCount(ctx, sub.taskID) sampler := sub.t.Sampler if taskCount+1 > sampler.SampleSize { diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go index 8d15b254b..a1bcc8bbe 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go @@ -253,7 +253,6 @@ func (h *TraceHubServiceImpl) dispatch(ctx context.Context, span *loop_span.Span if sub.t.TaskStatus != entity.TaskStatusRunning { continue } - logs.CtxInfo(ctx, " sub.AddSpan: %v", sub) if err := sub.AddSpan(ctx, span); err != nil { merr = multierror.Append(merr, errors.WithMessagef(err, "add span to subscriber, log_id=%s, trace_id=%s, span_id=%s, task_id=%d", span.LogID, span.TraceID, span.SpanID, sub.taskID)) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go index fb3e55cd1..bfb3da2c9 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/subscriber.go @@ -180,7 +180,7 @@ func (s *spanSubscriber) AddSpan(ctx context.Context, span *loop_span.Span) erro return nil } trigger := &taskexe.Trigger{Task: s.t, Span: span, TaskRun: taskRunConfig} - logs.CtxInfo(ctx, "invoke processor, trigger: %v", trigger) + logs.CtxDebug(ctx, "invoke processor, trigger: %v", trigger) err = s.processor.Invoke(ctx, trigger) if err != nil { logs.CtxWarn(ctx, "invoke processor failed, trace_id=%s, span_id=%s, err: %v", span.TraceID, span.SpanID, err) diff --git a/backend/modules/observability/infra/repo/redis/task.go b/backend/modules/observability/infra/repo/redis/task.go index a96ce325a..a285c75da 100755 --- a/backend/modules/observability/infra/repo/redis/task.go +++ b/backend/modules/observability/infra/repo/redis/task.go @@ -252,7 +252,6 @@ func (p *TaskDAOImpl) GetTaskRunCount(ctx context.Context, taskID, taskRunID int func (p *TaskDAOImpl) IncrTaskCount(ctx context.Context, taskID int64, ttl time.Duration) (int64, error) { key := p.makeTaskCountCacheKey(taskID) result, err := p.cmdable.Incr(ctx, key).Result() - logs.CtxInfo(ctx, "redis incr task count success, taskID: %v, key: %v, result: %v", taskID, key, result) if err != nil { logs.CtxError(ctx, "redis incr task count failed", "key", key, "err", err) return 0, errorx.Wrapf(err, "redis incr task count key: %v", key) @@ -291,7 +290,6 @@ func (p *TaskDAOImpl) DecrTaskCount(ctx context.Context, taskID int64, ttl time. logs.CtxError(ctx, "redis decr task count failed", "key", key, "err", err) return 0, errorx.Wrapf(err, "redis decr task count key: %v", key) } - logs.CtxInfo(ctx, "redis decr task count success, taskID: %v, key: %v, result: %v", taskID, key, result) // 如果减少后变为负数,重置为0 if result < 0 { if err := p.cmdable.Set(ctx, key, 0, ttl).Err(); err != nil { @@ -312,7 +310,6 @@ func (p *TaskDAOImpl) DecrTaskCount(ctx context.Context, taskID int64, ttl time. func (p *TaskDAOImpl) IncrTaskRunCount(ctx context.Context, taskID, taskRunID int64, ttl time.Duration) (int64, error) { key := p.makeTaskRunCountCacheKey(taskID, taskRunID) result, err := p.cmdable.Incr(ctx, key).Result() - logs.CtxInfo(ctx, "redis incr task run count success, taskID: %v,taskRunID: %v, key: %v, result: %v", taskID, taskRunID, key, result) if err != nil { logs.CtxError(ctx, "redis incr task run count failed", "key", key, "err", err) return 0, errorx.Wrapf(err, "redis incr task run count key: %v", key) diff --git a/backend/modules/observability/infra/repo/redis/task_run.go b/backend/modules/observability/infra/repo/redis/task_run.go index 9cec9e002..8b2e34733 100755 --- a/backend/modules/observability/infra/repo/redis/task_run.go +++ b/backend/modules/observability/infra/repo/redis/task_run.go @@ -51,7 +51,6 @@ func (p *TaskRunDAOImpl) IncrTaskRunSuccessCount(ctx context.Context, taskID, ta logs.CtxError(ctx, "redis incr taskrun success count failed, key:%v, err:%v", key, err) return errorx.Wrapf(err, "redis incr taskrun success count key: %v", key) } - logs.CtxInfo(ctx, "redis incr taskrun success count success, key:%v, count:%v", key, cmd.Val()) if err := p.cmdable.Expire(ctx, key, ttl).Err(); err != nil { logs.CtxError(ctx, "redis expire taskrun success count failed, key:%v, err:%v", key, err) return errorx.Wrapf(err, "redis expire taskrun success count key: %v", key) From e9b675b6f0098c1b97bae2745b5d7a299fb1f78d Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Fri, 28 Nov 2025 18:25:08 +0800 Subject: [PATCH 05/12] Remove log. Change-Id: I0a1af179522e12c4ae239cce833b5db8c33f37ce --- .../domain/task/service/taskexe/tracehub/local_cache.go | 2 +- .../domain/task/service/taskexe/tracehub/span_trigger.go | 3 +-- .../modules/observability/infra/mq/consumer/task_consumer.go | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/local_cache.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/local_cache.go index 67eb53ccd..b48b6db60 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/local_cache.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/local_cache.go @@ -50,6 +50,6 @@ func (l *LocalCache) LoadTaskCache(ctx context.Context) TaskCacheInfo { return TaskCacheInfo{} } - logs.CtxInfo(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs)) + logs.CtxDebug(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs)) return cacheInfo } diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go index a1bcc8bbe..42310dd56 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/span_trigger.go @@ -19,14 +19,13 @@ import ( func (h *TraceHubServiceImpl) SpanTrigger(ctx context.Context, span *loop_span.Span) error { logSuffix := fmt.Sprintf("log_id=%s, trace_id=%s, span_id=%s", span.LogID, span.TraceID, span.SpanID) - logs.CtxInfo(ctx, "auto_task start, %s", logSuffix) // 1. perform initial filtering based on space_id // 1.1 Filter out spans that do not belong to any space or bot cacheInfo := h.localCache.LoadTaskCache(ctx) spaceIDs, botIDs := cacheInfo.WorkspaceIDs, cacheInfo.BotIDs if !gslice.Contains(spaceIDs, span.WorkspaceID) && !gslice.Contains(botIDs, span.TagsString["bot_id"]) { - logs.CtxInfo(ctx, "no space or bot found for span, space_id=%s, bot_id=%s, %s", span.WorkspaceID, span.TagsString["bot_id"], logSuffix) + logs.CtxDebug(ctx, "no space or bot found for span, space_id=%s, bot_id=%s, %s", span.WorkspaceID, span.TagsString["bot_id"], logSuffix) return nil } // 1.2 Filter out spans of type Evaluator diff --git a/backend/modules/observability/infra/mq/consumer/task_consumer.go b/backend/modules/observability/infra/mq/consumer/task_consumer.go index 220738143..0bfd13b59 100644 --- a/backend/modules/observability/infra/mq/consumer/task_consumer.go +++ b/backend/modules/observability/infra/mq/consumer/task_consumer.go @@ -56,9 +56,9 @@ func (e *TaskConsumer) HandleMessage(ctx context.Context, ext *mq.MessageExt) er ctx = logs.SetLogID(ctx, logID) event := new(entity.RawSpan) if err := json.Unmarshal(ext.Body, event); err != nil { - logs.CtxError(ctx, "Task msg json unmarshal fail, raw: %v, err: %s", conv.UnsafeBytesToString(ext.Body), err) + logs.CtxWarn(ctx, "Task msg json unmarshal fail, raw: %v, err: %s", conv.UnsafeBytesToString(ext.Body), err) return nil } - logs.CtxInfo(ctx, "Span msg,log_id=%s, trace_id=%s, span_id=%s,msgID=%s", event.LogID, event.TraceID, event.SpanID, ext.MsgID) + logs.CtxDebug(ctx, "Span msg,log_id=%s, trace_id=%s, span_id=%s,msgID=%s", event.LogID, event.TraceID, event.SpanID, ext.MsgID) return e.handler.SpanTrigger(ctx, event, nil) } From 966bd1c513d95db19448a644e4cc46f51b252504 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sat, 29 Nov 2025 17:14:09 +0800 Subject: [PATCH 06/12] Add ut. Change-Id: Id4835dc1a577665bead69aa7f0e49353a074d090 --- .../observability/application/convertor/task/task.go | 2 ++ .../observability/application/convertor/task/task_test.go | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/backend/modules/observability/application/convertor/task/task.go b/backend/modules/observability/application/convertor/task/task.go index f33dea056..2adc5639d 100644 --- a/backend/modules/observability/application/convertor/task/task.go +++ b/backend/modules/observability/application/convertor/task/task.go @@ -460,6 +460,7 @@ func TaskConfigDTO2DO(taskConfig *task.TaskConfig) *entity.TaskConfig { } } +/* func TaskRunDTO2DO(taskRun *task.TaskRun) *entity.TaskRun { if taskRun == nil { return nil @@ -527,6 +528,7 @@ func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail { LastSpanPageToken: v.GetLastSpanPageToken(), } } +*/ func getLastPartAfterDot(s string) string { s = strings.TrimRight(s, ".") diff --git a/backend/modules/observability/application/convertor/task/task_test.go b/backend/modules/observability/application/convertor/task/task_test.go index 9c22e3a90..78605fcbd 100755 --- a/backend/modules/observability/application/convertor/task/task_test.go +++ b/backend/modules/observability/application/convertor/task/task_test.go @@ -46,6 +46,13 @@ func TestTaskDOs2DTOs(t *testing.T) { FailedCount: 1, TotalCount: 4, }, + BackfillDetail: &entity.BackfillDetail{ + SuccessCount: 3, + FailedCount: 1, + TotalCount: 4, + BackfillStatus: kitTask.RunStatusRunning, + LastSpanPageToken: "abc", + }, CreatedAt: now, UpdatedAt: now, } From 9e9547226362ce9631e717ae889804a0dba10cd7 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sat, 29 Nov 2025 17:25:44 +0800 Subject: [PATCH 07/12] Fix build. Change-Id: Ie54336394b431fcb0a5c0a52ab2ef01b08788e87 --- .../modules/observability/application/convertor/task/task.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/modules/observability/application/convertor/task/task.go b/backend/modules/observability/application/convertor/task/task.go index 2adc5639d..0501039ee 100644 --- a/backend/modules/observability/application/convertor/task/task.go +++ b/backend/modules/observability/application/convertor/task/task.go @@ -480,6 +480,7 @@ func TaskRunDTO2DO(taskRun *task.TaskRun) *entity.TaskRun { UpdatedAt: time.UnixMilli(taskRun.GetBaseInfo().GetUpdatedAt()), } } +*/ func TaskRunConfigDTO2DO(v *task.TaskRunConfig) *entity.TaskRunConfig { if v == nil { @@ -516,6 +517,7 @@ func TaskRunConfigDTO2DO(v *task.TaskRunConfig) *entity.TaskRunConfig { } } +/* func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail { if v == nil { return nil From 2bbd0402b0035aa45491e6e41ea02361fd992459 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sat, 29 Nov 2025 17:43:45 +0800 Subject: [PATCH 08/12] Fix golint. Change-Id: Id379706680184455df59c33732fb8541c773c400 --- .../domain/task/service/taskexe/tracehub/backfill.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index 76d59b36c..1b96d99f2 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -181,6 +181,7 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub listParam.PageToken = pageToken sub.tr.BackfillDetail.LastSpanPageToken = pageToken } + // todo 不应该这里直接写po字段 err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), From 54b54c34c68af69718bad06c5d8a7e685317a21c Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sat, 29 Nov 2025 17:54:48 +0800 Subject: [PATCH 09/12] Fix golint. Change-Id: I24ea2dce95d489a0967d010d166052cbc0709bc7 --- .../domain/task/service/taskexe/tracehub/backfill.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go index 1b96d99f2..1718a3d4c 100644 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill.go @@ -186,7 +186,6 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{ "backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail), }) - if err != nil { logs.CtxError(ctx, "update task run failed, task_id=%d, err=%v", sub.t.ID, err) return err From 203c5c00b96afd795752a460d9b73837f4a26ed5 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sat, 29 Nov 2025 22:52:08 +0800 Subject: [PATCH 10/12] Add ut. Change-Id: Ief44f36c9e5463a89abfab84702c645b663d0843 --- .github/workflows/ai-cr-required.yaml | 2 +- .../service/taskexe/tracehub/backfill_test.go | 58 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ai-cr-required.yaml b/.github/workflows/ai-cr-required.yaml index e8c0c89b0..01f653812 100644 --- a/.github/workflows/ai-cr-required.yaml +++ b/.github/workflows/ai-cr-required.yaml @@ -28,7 +28,7 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | echo "PR number: $PR_NUMBER" - COMMENTS=$(gh api -H "Accept: application/vnd.github+json" /repos/${{ github.repository }}/pulls/${PR_NUMBER}/comments --jq '.[].user.login') + COMMENTS=$(gh api -H "Accept: application/vnd.github+json" /repos/${{ github.repository }}/pulls/${PR_NUMBER}/reviews --jq '.[].user.login') echo "Comment authors: $COMMENTS" echo "$COMMENTS" | grep -q "^CozeLoop$" if [ $? -ne 0 ]; then diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go index 1e3ec7a9b..51bf8f862 100755 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go @@ -301,6 +301,64 @@ func TestTraceHubServiceImpl_ListAndSendSpans_GetTenantsError(t *testing.T) { require.ErrorIs(t, err, tenantErr) } +func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testing.T) { + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockTaskRepo := repo_mocks.NewMockITaskRepo(ctrl) + mockTraceRepo := trepo_mocks.NewMockITraceRepo(ctrl) + mockTenant := tenant_mocks.NewMockITenantProvider(ctrl) + mockBuilder := builder_mocks.NewMockTraceFilterProcessorBuilder(ctrl) + filterMock := spanfilter_mocks.NewMockFilter(ctrl) + + impl := &TraceHubServiceImpl{ + taskRepo: mockTaskRepo, + traceRepo: mockTraceRepo, + tenantProvider: mockTenant, + buildHelper: mockBuilder, + } + + now := time.Now() + sub, proc := newBackfillSubscriber(mockTaskRepo, now) + domainRun := newDomainBackfillTaskRun(now) + span := newTestSpan(now) + + mockBuilder.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)). + Return(filterMock, nil) + filterMock.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, true, nil) + filterMock.EXPECT().BuildRootSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{}, nil) + mockBuilder.EXPECT().BuildGetTraceProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor(nil), nil).Times(2) + mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil) + + mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) { + if param.PageToken == "" { + return &repo.ListSpansResult{ + Spans: loop_span.SpanList{span}, + PageToken: "next", + HasMore: true, + }, nil + } else if param.PageToken == "next" { + return &repo.ListSpansResult{ + Spans: loop_span.SpanList{span}, + PageToken: "", + HasMore: false, + }, nil + } + return nil, errors.New("invalid token") + }).Times(2) + + mockTaskRepo.EXPECT().GetTaskCount(gomock.Any(), int64(1)).Return(int64(0), nil).Times(2) + mockTaskRepo.EXPECT().GetBackfillTaskRun(gomock.Any(), gomock.Nil(), int64(1)).Return(domainRun, nil).Times(2) + mockTaskRepo.EXPECT().UpdateTaskRunWithOCC(gomock.Any(), sub.tr.ID, sub.tr.WorkspaceID, gomock.AssignableToTypeOf(map[string]interface{}{})).Return(nil).Times(2) + + err := impl.listAndSendSpans(context.Background(), sub) + require.NoError(t, err) + require.True(t, proc.invokeCalled) + require.NotNil(t, sub.tr.BackfillDetail) + require.NotNil(t, sub.tr.BackfillDetail.LastSpanPageToken) + require.Equal(t, "next", sub.tr.BackfillDetail.LastSpanPageToken) +} + func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) { ctrl := gomock.NewController(t) t.Cleanup(ctrl.Finish) From c782f7b54c100b2fab8a77b4b21f58b86c12413a Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sun, 30 Nov 2025 09:34:41 +0800 Subject: [PATCH 11/12] Fix golint. Change-Id: Ia8c5b25011fb79e67f4b3d228ce00d678be369e7 --- .../service/taskexe/tracehub/backfill_test.go | 8 +- .../infra/mq/consumer/task_consumer_test.go | 281 ++++++++++++++++++ 2 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 backend/modules/observability/infra/mq/consumer/task_consumer_test.go diff --git a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go index 51bf8f862..150da4db8 100755 --- a/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go +++ b/backend/modules/observability/domain/task/service/taskexe/tracehub/backfill_test.go @@ -331,20 +331,22 @@ func TestTraceHubServiceImpl_ListAndSendSpans_WithoutLastSpanPageToken(t *testin mockTenant.EXPECT().GetTenantsByPlatformType(gomock.Any(), loop_span.PlatformType(common.PlatformTypeCozeBot)).Return([]string{"tenant"}, nil) mockTraceRepo.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) { - if param.PageToken == "" { + switch param.PageToken { + case "": return &repo.ListSpansResult{ Spans: loop_span.SpanList{span}, PageToken: "next", HasMore: true, }, nil - } else if param.PageToken == "next" { + case "next": return &repo.ListSpansResult{ Spans: loop_span.SpanList{span}, PageToken: "", HasMore: false, }, nil + default: + return nil, errors.New("invalid token") } - return nil, errors.New("invalid token") }).Times(2) mockTaskRepo.EXPECT().GetTaskCount(gomock.Any(), int64(1)).Return(int64(0), nil).Times(2) diff --git a/backend/modules/observability/infra/mq/consumer/task_consumer_test.go b/backend/modules/observability/infra/mq/consumer/task_consumer_test.go new file mode 100644 index 000000000..8f4d7c6ec --- /dev/null +++ b/backend/modules/observability/infra/mq/consumer/task_consumer_test.go @@ -0,0 +1,281 @@ +// Copyright (c) 2025 coze-dev Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumer + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "github.com/coze-dev/coze-loop/backend/infra/mq" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/config" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" + "github.com/coze-dev/coze-loop/backend/pkg/conf/mocks" + "github.com/coze-dev/coze-loop/backend/pkg/json" +) + +// MockITaskQueueConsumer 是 ITaskQueueConsumer 的 mock 实现 +type MockITaskQueueConsumer struct { + ctrl *gomock.Controller + recorder *MockITaskQueueConsumerMockRecorder +} + +// MockITaskQueueConsumerMockRecorder 是 mock 的记录器 +type MockITaskQueueConsumerMockRecorder struct { + mock *MockITaskQueueConsumer +} + +// NewMockITaskQueueConsumer 创建一个新的 mock 实例 +func NewMockITaskQueueConsumer(ctrl *gomock.Controller) *MockITaskQueueConsumer { + mock := &MockITaskQueueConsumer{ctrl: ctrl} + mock.recorder = &MockITaskQueueConsumerMockRecorder{mock} + return mock +} + +// EXPECT 返回一个对象,允许调用者指示预期的使用 +func (m *MockITaskQueueConsumer) EXPECT() *MockITaskQueueConsumerMockRecorder { + return m.recorder +} + +// SpanTrigger mocks base method +func (m *MockITaskQueueConsumer) SpanTrigger(ctx context.Context, rawSpan *entity.RawSpan, span *loop_span.Span) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SpanTrigger", ctx, rawSpan, span) + ret0, _ := ret[0].(error) + return ret0 +} + +// SpanTrigger indicates an expected call of SpanTrigger +func (mr *MockITaskQueueConsumerMockRecorder) SpanTrigger(ctx, rawSpan, span interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SpanTrigger", reflect.TypeOf((*MockITaskQueueConsumer)(nil).SpanTrigger), ctx, rawSpan, span) +} + +// AutoEvalCallback mocks base method +func (m *MockITaskQueueConsumer) AutoEvalCallback(ctx context.Context, event *entity.AutoEvalEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AutoEvalCallback", ctx, event) + ret0, _ := ret[0].(error) + return ret0 +} + +// AutoEvalCallback indicates an expected call of AutoEvalCallback +func (mr *MockITaskQueueConsumerMockRecorder) AutoEvalCallback(ctx, event interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AutoEvalCallback", reflect.TypeOf((*MockITaskQueueConsumer)(nil).AutoEvalCallback), ctx, event) +} + +// AutoEvalCorrection mocks base method +func (m *MockITaskQueueConsumer) AutoEvalCorrection(ctx context.Context, event *entity.CorrectionEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AutoEvalCorrection", ctx, event) + ret0, _ := ret[0].(error) + return ret0 +} + +// AutoEvalCorrection indicates an expected call of AutoEvalCorrection +func (mr *MockITaskQueueConsumerMockRecorder) AutoEvalCorrection(ctx, event interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AutoEvalCorrection", reflect.TypeOf((*MockITaskQueueConsumer)(nil).AutoEvalCorrection), ctx, event) +} + +// BackFill mocks base method +func (m *MockITaskQueueConsumer) BackFill(ctx context.Context, event *entity.BackFillEvent) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BackFill", ctx, event) + ret0, _ := ret[0].(error) + return ret0 +} + +// BackFill indicates an expected call of BackFill +func (mr *MockITaskQueueConsumerMockRecorder) BackFill(ctx, event interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BackFill", reflect.TypeOf((*MockITaskQueueConsumer)(nil).BackFill), ctx, event) +} + +func TestTaskConsumer_ConsumerCfg(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockConfigLoader := mocks.NewMockIConfigLoader(ctrl) + mockHandler := NewMockITaskQueueConsumer(ctrl) + consumer := NewTaskConsumer(mockHandler, mockConfigLoader) + + tests := []struct { + name string + mockSetup func() + expectedCfg *mq.ConsumerConfig + expectedError error + }{ + { + name: "正常场景: 配置加载成功", + mockSetup: func() { + enablePPE := true + isEnabled := true + cfg := &config.MqConsumerCfg{ + Addr: []string{"localhost:9876"}, + Topic: "test_topic", + ConsumerGroup: "test_group", + Timeout: 5000, + WorkerNum: 10, + EnablePPE: &enablePPE, + IsEnabled: &isEnabled, + TagExpression: nil, + } + mockConfigLoader.EXPECT().UnmarshalKey(gomock.Any(), "task_mq_consumer_config", gomock.Any()). + SetArg(2, *cfg).Return(nil) + }, + expectedCfg: func() *mq.ConsumerConfig { + enablePPE := true + isEnabled := true + return &mq.ConsumerConfig{ + Addr: []string{"localhost:9876"}, + Topic: "test_topic", + ConsumerGroup: "test_group", + ConsumeTimeout: 5000000000, // 5秒转换为纳秒 + ConsumeGoroutineNums: 10, + EnablePPE: &enablePPE, + IsEnabled: &isEnabled, + } + }(), + expectedError: nil, + }, + { + name: "正常场景: 配置加载成功且包含TagExpression", + mockSetup: func() { + tagExpr := "tagA || tagB" + enablePPE := false + isEnabled := true + cfg := &config.MqConsumerCfg{ + Addr: []string{"localhost:9876"}, + Topic: "test_topic", + ConsumerGroup: "test_group", + Timeout: 3000, + WorkerNum: 5, + EnablePPE: &enablePPE, + IsEnabled: &isEnabled, + TagExpression: &tagExpr, + } + mockConfigLoader.EXPECT().UnmarshalKey(gomock.Any(), "task_mq_consumer_config", gomock.Any()). + SetArg(2, *cfg).Return(nil) + }, + expectedCfg: func() *mq.ConsumerConfig { + enablePPE := false + isEnabled := true + return &mq.ConsumerConfig{ + Addr: []string{"localhost:9876"}, + Topic: "test_topic", + ConsumerGroup: "test_group", + ConsumeTimeout: 3000000000, // 3秒转换为纳秒 + ConsumeGoroutineNums: 5, + EnablePPE: &enablePPE, + IsEnabled: &isEnabled, + TagExpression: "tagA || tagB", + } + }(), + expectedError: nil, + }, + { + name: "异常场景: 配置加载失败", + mockSetup: func() { + mockConfigLoader.EXPECT().UnmarshalKey(gomock.Any(), "task_mq_consumer_config", gomock.Any()). + Return(assert.AnError) + }, + expectedCfg: nil, + expectedError: assert.AnError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mockSetup() + cfg, err := consumer.ConsumerCfg(context.Background()) + + assert.Equal(t, tt.expectedError, err) + if tt.expectedError == nil { + assert.NotNil(t, cfg) + assert.Equal(t, tt.expectedCfg.Addr, cfg.Addr) + assert.Equal(t, tt.expectedCfg.Topic, cfg.Topic) + assert.Equal(t, tt.expectedCfg.ConsumerGroup, cfg.ConsumerGroup) + assert.Equal(t, tt.expectedCfg.ConsumeTimeout, cfg.ConsumeTimeout) + assert.Equal(t, tt.expectedCfg.ConsumeGoroutineNums, cfg.ConsumeGoroutineNums) + assert.Equal(t, tt.expectedCfg.EnablePPE, cfg.EnablePPE) + assert.Equal(t, tt.expectedCfg.IsEnabled, cfg.IsEnabled) + assert.Equal(t, tt.expectedCfg.TagExpression, cfg.TagExpression) + } + }) + } +} + +func TestTaskConsumer_HandleMessage(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockConfigLoader := mocks.NewMockIConfigLoader(ctrl) + mockHandler := NewMockITaskQueueConsumer(ctrl) + consumer := NewTaskConsumer(mockHandler, mockConfigLoader) + + tests := []struct { + name string + message *mq.MessageExt + mockSetup func() + expectedError error + }{ + { + name: "正常场景: 消息处理成功", + message: &mq.MessageExt{ + Message: mq.Message{ + Body: func() []byte { + data, _ := json.Marshal(&entity.RawSpan{ + LogID: "test_log_id", + TraceID: "test_trace_id", + SpanID: "test_span_id", + }) + return data + }(), + }, + MsgID: "test_msg_id", + }, + mockSetup: func() { + mockHandler.EXPECT().SpanTrigger(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) + }, + expectedError: nil, + }, + { + name: "正常场景: JSON解析失败但不返回错误", + message: &mq.MessageExt{ + Message: mq.Message{ + Body: []byte("invalid json"), + }, + MsgID: "test_msg_id", + }, + mockSetup: func() {}, + expectedError: nil, + }, + { + name: "正常场景: 处理空消息", + message: &mq.MessageExt{ + Message: mq.Message{ + Body: []byte(""), + }, + MsgID: "test_msg_id", + }, + mockSetup: func() {}, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mockSetup() + err := consumer.HandleMessage(context.Background(), tt.message) + assert.Equal(t, tt.expectedError, err) + }) + } +} \ No newline at end of file From 282d8d03fe20cca61444659ae9c355b2507daf06 Mon Sep 17 00:00:00 2001 From: taoyifan89 Date: Sun, 30 Nov 2025 10:12:05 +0800 Subject: [PATCH 12/12] Fix golint. Change-Id: Iaaeb2a5c8e6c706e6c0b9dea43e636a09141017b --- .../observability/infra/mq/consumer/task_consumer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/modules/observability/infra/mq/consumer/task_consumer_test.go b/backend/modules/observability/infra/mq/consumer/task_consumer_test.go index 8f4d7c6ec..ce91c8540 100644 --- a/backend/modules/observability/infra/mq/consumer/task_consumer_test.go +++ b/backend/modules/observability/infra/mq/consumer/task_consumer_test.go @@ -232,7 +232,7 @@ func TestTaskConsumer_HandleMessage(t *testing.T) { Message: mq.Message{ Body: func() []byte { data, _ := json.Marshal(&entity.RawSpan{ - LogID: "test_log_id", + LogID: "test_log_id", TraceID: "test_trace_id", SpanID: "test_span_id", }) @@ -278,4 +278,4 @@ func TestTaskConsumer_HandleMessage(t *testing.T) { assert.Equal(t, tt.expectedError, err) }) } -} \ No newline at end of file +}