Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions backend/modules/observability/application/convertor/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ func BackfillRunDetailDO2DTO(backfillDetail *entity.BackfillDetail) *task.Backfi
return nil
}
return &task.BackfillDetail{
SuccessCount: backfillDetail.SuccessCount,
FailedCount: backfillDetail.FailedCount,
TotalCount: backfillDetail.TotalCount,
BackfillStatus: backfillDetail.BackfillStatus,
LastSpanPageToken: backfillDetail.LastSpanPageToken,
SuccessCount: &backfillDetail.SuccessCount,
FailedCount: &backfillDetail.FailedCount,
TotalCount: &backfillDetail.TotalCount,
BackfillStatus: &backfillDetail.BackfillStatus,
LastSpanPageToken: &backfillDetail.LastSpanPageToken,
}
}

Expand Down Expand Up @@ -520,11 +520,11 @@ func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail {
return nil
}
return &entity.BackfillDetail{
SuccessCount: v.SuccessCount,
FailedCount: v.FailedCount,
TotalCount: v.TotalCount,
BackfillStatus: v.BackfillStatus,
LastSpanPageToken: v.LastSpanPageToken,
SuccessCount: v.GetSuccessCount(),
FailedCount: v.GetFailedCount(),
TotalCount: v.GetTotalCount(),
BackfillStatus: v.GetBackfillStatus(),
LastSpanPageToken: v.GetLastSpanPageToken(),
}
}

Expand Down
10 changes: 5 additions & 5 deletions backend/modules/observability/domain/task/entity/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ type TaskRun struct {
UpdatedAt time.Time // 更新时间
}
type BackfillDetail struct {
SuccessCount *int64 `json:"success_count"`
FailedCount *int64 `json:"failed_count"`
TotalCount *int64 `json:"total_count"`
BackfillStatus *string `json:"backfill_status"`
LastSpanPageToken *string `json:"last_span_page_token"`
SuccessCount int64 `json:"success_count,omitempty"`
FailedCount int64 `json:"failed_count,omitempty"`
TotalCount int64 `json:"total_count,omitempty"`
BackfillStatus string `json:"backfill_status,omitempty"`
LastSpanPageToken string `json:"last_span_page_token,omitempty"`
}

type TaskRunConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub
NotQueryAnnotation: true, // No annotation query required during backfill
}

if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != nil {
listParam.PageToken = *sub.tr.BackfillDetail.LastSpanPageToken
if sub.tr.BackfillDetail != nil && sub.tr.BackfillDetail.LastSpanPageToken != "" {
listParam.PageToken = sub.tr.BackfillDetail.LastSpanPageToken
}
if sub.tr.BackfillDetail == nil {
sub.tr.BackfillDetail = &entity.BackfillDetail{}
}

totalCount := int64(0)
Expand All @@ -174,10 +177,15 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub
totalCount += int64(len(spans))
logs.CtxInfo(ctx, "Processed %d spans completed, total=%d, task_id=%d", len(spans), totalCount, sub.t.ID)

if pageToken != "" {
listParam.PageToken = pageToken
sub.tr.BackfillDetail.LastSpanPageToken = pageToken
}
// todo 不应该这里直接写po字段
err = h.taskRepo.UpdateTaskRunWithOCC(ctx, sub.tr.ID, sub.tr.WorkspaceID, map[string]interface{}{
"backfill_detail": ToJSONString(ctx, sub.tr.BackfillDetail),
})

if err != nil {
logs.CtxError(ctx, "update task run failed, task_id=%d, err=%v", sub.t.ID, err)
return err
Expand All @@ -188,14 +196,12 @@ func (h *TraceHubServiceImpl) listAndSendSpans(ctx context.Context, sub *spanSub
if err = sub.processor.OnTaskFinished(ctx, taskexe.OnTaskFinishedReq{
Task: sub.t,
TaskRun: sub.tr,
IsFinish: false,
IsFinish: false, // 任务可能同时有历史回溯和新任务,不能直接关闭
}); err != nil {
return err
}
return nil
}
listParam.PageToken = pageToken
sub.tr.BackfillDetail.LastSpanPageToken = &pageToken
}
}

Expand Down Expand Up @@ -424,8 +430,6 @@ func (h *TraceHubServiceImpl) processSpansForBackfill(ctx context.Context, spans
func (h *TraceHubServiceImpl) processBatchSpans(ctx context.Context, spans []*loop_span.Span, sub *spanSubscriber) (err error, shouldFinish bool) {
for _, span := range spans {
// Execute processing logic according to the task type
logs.CtxInfo(ctx, "processing span for backfill, span_id=%s, trace_id=%s, task_id=%d",
span.SpanID, span.TraceID, sub.t.ID)
taskCount, _ := h.taskRepo.GetTaskCount(ctx, sub.taskID)
sampler := sub.t.Sampler
if taskCount+1 > sampler.SampleSize {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {

now := time.Now()
sub, proc := newBackfillSubscriber(mockTaskRepo, now)
sub.tr.BackfillDetail = &entity.BackfillDetail{LastSpanPageToken: ptr.Of("prev")}
sub.tr.BackfillDetail = &entity.BackfillDetail{LastSpanPageToken: "prev"}
domainRun := newDomainBackfillTaskRun(now)
span := newTestSpan(now)

Expand Down Expand Up @@ -350,7 +350,7 @@ func TestTraceHubServiceImpl_ListAndSendSpans_Success(t *testing.T) {
require.True(t, proc.invokeCalled)
require.NotNil(t, sub.tr.BackfillDetail)
require.NotNil(t, sub.tr.BackfillDetail.LastSpanPageToken)
require.Equal(t, "prev", ptr.From(sub.tr.BackfillDetail.LastSpanPageToken))
require.Equal(t, "prev", sub.tr.BackfillDetail.LastSpanPageToken)
}

func TestTraceHubServiceImpl_ListAndSendSpans_ListError(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ func (l *LocalCache) LoadTaskCache(ctx context.Context) TaskCacheInfo {
return TaskCacheInfo{}
}

logs.CtxInfo(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs))
logs.CtxDebug(ctx, "Retrieve task list from cache, taskCount=%d, spaceCount=%d, botCount=%d", len(cacheInfo.Tasks), len(cacheInfo.WorkspaceIDs), len(cacheInfo.BotIDs))
return cacheInfo
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (

func (h *TraceHubServiceImpl) SpanTrigger(ctx context.Context, span *loop_span.Span) error {
logSuffix := fmt.Sprintf("log_id=%s, trace_id=%s, span_id=%s", span.LogID, span.TraceID, span.SpanID)
logs.CtxInfo(ctx, "auto_task start, %s", logSuffix)

// 1. perform initial filtering based on space_id
// 1.1 Filter out spans that do not belong to any space or bot
cacheInfo := h.localCache.LoadTaskCache(ctx)
spaceIDs, botIDs := cacheInfo.WorkspaceIDs, cacheInfo.BotIDs
if !gslice.Contains(spaceIDs, span.WorkspaceID) && !gslice.Contains(botIDs, span.TagsString["bot_id"]) {
logs.CtxInfo(ctx, "no space or bot found for span, space_id=%s, bot_id=%s, %s", span.WorkspaceID, span.TagsString["bot_id"], logSuffix)
logs.CtxDebug(ctx, "no space or bot found for span, space_id=%s, bot_id=%s, %s", span.WorkspaceID, span.TagsString["bot_id"], logSuffix)
return nil
}
// 1.2 Filter out spans of type Evaluator
Expand Down Expand Up @@ -253,7 +252,6 @@ func (h *TraceHubServiceImpl) dispatch(ctx context.Context, span *loop_span.Span
if sub.t.TaskStatus != entity.TaskStatusRunning {
continue
}
logs.CtxInfo(ctx, " sub.AddSpan: %v", sub)
if err := sub.AddSpan(ctx, span); err != nil {
merr = multierror.Append(merr, errors.WithMessagef(err, "add span to subscriber, log_id=%s, trace_id=%s, span_id=%s, task_id=%d",
span.LogID, span.TraceID, span.SpanID, sub.taskID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *spanSubscriber) AddSpan(ctx context.Context, span *loop_span.Span) erro
return nil
}
trigger := &taskexe.Trigger{Task: s.t, Span: span, TaskRun: taskRunConfig}
logs.CtxInfo(ctx, "invoke processor, trigger: %v", trigger)
logs.CtxDebug(ctx, "invoke processor, trigger: %v", trigger)
err = s.processor.Invoke(ctx, trigger)
if err != nil {
logs.CtxWarn(ctx, "invoke processor failed, trace_id=%s, span_id=%s, err: %v", span.TraceID, span.SpanID, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (e *TaskConsumer) HandleMessage(ctx context.Context, ext *mq.MessageExt) er
ctx = logs.SetLogID(ctx, logID)
event := new(entity.RawSpan)
if err := json.Unmarshal(ext.Body, event); err != nil {
logs.CtxError(ctx, "Task msg json unmarshal fail, raw: %v, err: %s", conv.UnsafeBytesToString(ext.Body), err)
logs.CtxWarn(ctx, "Task msg json unmarshal fail, raw: %v, err: %s", conv.UnsafeBytesToString(ext.Body), err)
return nil
}
logs.CtxInfo(ctx, "Span msg,log_id=%s, trace_id=%s, span_id=%s,msgID=%s", event.LogID, event.TraceID, event.SpanID, ext.MsgID)
logs.CtxDebug(ctx, "Span msg,log_id=%s, trace_id=%s, span_id=%s,msgID=%s", event.LogID, event.TraceID, event.SpanID, ext.MsgID)
return e.handler.SpanTrigger(ctx, event, nil)
}
3 changes: 0 additions & 3 deletions backend/modules/observability/infra/repo/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func (p *TaskDAOImpl) GetTaskRunCount(ctx context.Context, taskID, taskRunID int
func (p *TaskDAOImpl) IncrTaskCount(ctx context.Context, taskID int64, ttl time.Duration) (int64, error) {
key := p.makeTaskCountCacheKey(taskID)
result, err := p.cmdable.Incr(ctx, key).Result()
logs.CtxInfo(ctx, "redis incr task count success, taskID: %v, key: %v, result: %v", taskID, key, result)
if err != nil {
logs.CtxError(ctx, "redis incr task count failed", "key", key, "err", err)
return 0, errorx.Wrapf(err, "redis incr task count key: %v", key)
Expand Down Expand Up @@ -291,7 +290,6 @@ func (p *TaskDAOImpl) DecrTaskCount(ctx context.Context, taskID int64, ttl time.
logs.CtxError(ctx, "redis decr task count failed", "key", key, "err", err)
return 0, errorx.Wrapf(err, "redis decr task count key: %v", key)
}
logs.CtxInfo(ctx, "redis decr task count success, taskID: %v, key: %v, result: %v", taskID, key, result)
// 如果减少后变为负数,重置为0
if result < 0 {
if err := p.cmdable.Set(ctx, key, 0, ttl).Err(); err != nil {
Expand All @@ -312,7 +310,6 @@ func (p *TaskDAOImpl) DecrTaskCount(ctx context.Context, taskID int64, ttl time.
func (p *TaskDAOImpl) IncrTaskRunCount(ctx context.Context, taskID, taskRunID int64, ttl time.Duration) (int64, error) {
key := p.makeTaskRunCountCacheKey(taskID, taskRunID)
result, err := p.cmdable.Incr(ctx, key).Result()
logs.CtxInfo(ctx, "redis incr task run count success, taskID: %v,taskRunID: %v, key: %v, result: %v", taskID, taskRunID, key, result)
if err != nil {
logs.CtxError(ctx, "redis incr task run count failed", "key", key, "err", err)
return 0, errorx.Wrapf(err, "redis incr task run count key: %v", key)
Expand Down
1 change: 0 additions & 1 deletion backend/modules/observability/infra/repo/redis/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (p *TaskRunDAOImpl) IncrTaskRunSuccessCount(ctx context.Context, taskID, ta
logs.CtxError(ctx, "redis incr taskrun success count failed, key:%v, err:%v", key, err)
return errorx.Wrapf(err, "redis incr taskrun success count key: %v", key)
}
logs.CtxInfo(ctx, "redis incr taskrun success count success, key:%v, count:%v", key, cmd.Val())
if err := p.cmdable.Expire(ctx, key, ttl).Err(); err != nil {
logs.CtxError(ctx, "redis expire taskrun success count failed, key:%v, err:%v", key, err)
return errorx.Wrapf(err, "redis expire taskrun success count key: %v", key)
Expand Down
Loading