Skip to content

Commit eae5d7f

Browse files
committed
[ENH]: Garbage collection for soft deleted attached functions
1 parent ed0ec81 commit eae5d7f

File tree

14 files changed

+949
-5
lines changed

14 files changed

+949
-5
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/sysdb/coordinator/create_task_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"google.golang.org/grpc/codes"
1717
"google.golang.org/grpc/status"
1818
"google.golang.org/protobuf/types/known/structpb"
19+
"google.golang.org/protobuf/types/known/timestamppb"
1920
)
2021

2122
// testMinimalUUIDv7 is the test's copy of minimalUUIDv7 from task.go
@@ -663,3 +664,85 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param
663664
func TestAttachFunctionTestSuite(t *testing.T) {
664665
suite.Run(t, new(AttachFunctionTestSuite))
665666
}
667+
668+
// TestGetSoftDeletedAttachedFunctions_TimestampConsistency verifies that timestamps
669+
// are returned in microseconds (UnixMicro) to match other API methods
670+
func TestGetSoftDeletedAttachedFunctions_TimestampConsistency(t *testing.T) {
671+
ctx := context.Background()
672+
673+
// Create test timestamps with known values
674+
testTime := time.Date(2025, 10, 30, 12, 0, 0, 123456000, time.UTC) // 123.456 milliseconds
675+
expectedMicros := uint64(testTime.UnixMicro())
676+
677+
// Create mock coordinator with minimal setup
678+
mockMetaDomain := &dbmodel_mocks.IMetaDomain{}
679+
mockAttachedFunctionDb := &dbmodel_mocks.IAttachedFunctionDb{}
680+
mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(mockAttachedFunctionDb)
681+
682+
// Mock the database response with our test timestamps
683+
attachedFunctions := []*dbmodel.AttachedFunction{
684+
{
685+
ID: uuid.New(),
686+
Name: "test_function",
687+
InputCollectionID: "collection_123",
688+
OutputCollectionName: "output_collection",
689+
CompletionOffset: 100,
690+
MinRecordsForInvocation: 10,
691+
CreatedAt: testTime,
692+
UpdatedAt: testTime,
693+
NextRun: testTime,
694+
},
695+
}
696+
697+
mockAttachedFunctionDb.On("GetSoftDeletedAttachedFunctions", mock.Anything, mock.Anything).
698+
Return(attachedFunctions, nil)
699+
700+
coordinator := &Coordinator{
701+
catalog: Catalog{
702+
metaDomain: mockMetaDomain,
703+
},
704+
}
705+
706+
// Call GetSoftDeletedAttachedFunctions
707+
cutoffTime := timestamppb.New(testTime.Add(-24 * time.Hour))
708+
resp, err := coordinator.GetSoftDeletedAttachedFunctions(ctx, &coordinatorpb.GetSoftDeletedAttachedFunctionsRequest{
709+
CutoffTime: cutoffTime,
710+
Limit: 100,
711+
})
712+
713+
// Verify response
714+
if err != nil {
715+
t.Fatalf("GetSoftDeletedAttachedFunctions failed: %v", err)
716+
}
717+
if len(resp.AttachedFunctions) != 1 {
718+
t.Fatalf("Expected 1 attached function, got %d", len(resp.AttachedFunctions))
719+
}
720+
721+
af := resp.AttachedFunctions[0]
722+
723+
// Verify timestamps are in microseconds (not seconds)
724+
if af.CreatedAt != expectedMicros {
725+
t.Errorf("CreatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.CreatedAt)
726+
}
727+
if af.UpdatedAt != expectedMicros {
728+
t.Errorf("UpdatedAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.UpdatedAt)
729+
}
730+
if af.NextRunAt != expectedMicros {
731+
t.Errorf("NextRunAt timestamp mismatch: expected %d microseconds, got %d", expectedMicros, af.NextRunAt)
732+
}
733+
734+
// Verify these are NOT in seconds (would be ~1000x smaller)
735+
expectedSeconds := uint64(testTime.Unix())
736+
if af.CreatedAt == expectedSeconds {
737+
t.Error("CreatedAt appears to be in seconds instead of microseconds")
738+
}
739+
if af.UpdatedAt == expectedSeconds {
740+
t.Error("UpdatedAt appears to be in seconds instead of microseconds")
741+
}
742+
if af.NextRunAt == expectedSeconds {
743+
t.Error("NextRunAt appears to be in seconds instead of microseconds")
744+
}
745+
746+
mockMetaDomain.AssertExpectations(t)
747+
mockAttachedFunctionDb.AssertExpectations(t)
748+
}

go/pkg/sysdb/coordinator/task.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,3 +822,75 @@ func (s *Coordinator) CleanupExpiredPartialAttachedFunctions(ctx context.Context
822822
CleanedUpIds: cleanedAttachedFunctionIDStrings,
823823
}, nil
824824
}
825+
826+
// GetSoftDeletedAttachedFunctions retrieves attached functions that are soft deleted and were updated before the cutoff time
827+
func (s *Coordinator) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
828+
log := log.With(zap.String("method", "GetSoftDeletedAttachedFunctions"))
829+
830+
if req.CutoffTime == nil {
831+
log.Error("GetSoftDeletedAttachedFunctions: cutoff_time is required")
832+
return nil, status.Errorf(codes.InvalidArgument, "cutoff_time is required")
833+
}
834+
835+
if req.Limit <= 0 {
836+
log.Error("GetSoftDeletedAttachedFunctions: limit must be greater than 0")
837+
return nil, status.Errorf(codes.InvalidArgument, "limit must be greater than 0")
838+
}
839+
840+
cutoffTime := req.CutoffTime.AsTime()
841+
attachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(ctx).GetSoftDeletedAttachedFunctions(cutoffTime, req.Limit)
842+
if err != nil {
843+
log.Error("GetSoftDeletedAttachedFunctions: failed to get soft deleted attached functions", zap.Error(err))
844+
return nil, err
845+
}
846+
847+
// Convert to proto response
848+
protoAttachedFunctions := make([]*coordinatorpb.AttachedFunction, len(attachedFunctions))
849+
for i, af := range attachedFunctions {
850+
protoAttachedFunctions[i] = &coordinatorpb.AttachedFunction{
851+
Id: af.ID.String(),
852+
Name: af.Name,
853+
InputCollectionId: af.InputCollectionID,
854+
OutputCollectionName: af.OutputCollectionName,
855+
CompletionOffset: uint64(af.CompletionOffset),
856+
MinRecordsForInvocation: uint64(af.MinRecordsForInvocation),
857+
CreatedAt: uint64(af.CreatedAt.UnixMicro()),
858+
UpdatedAt: uint64(af.UpdatedAt.UnixMicro()),
859+
}
860+
861+
protoAttachedFunctions[i].NextRunAt = uint64(af.NextRun.UnixMicro())
862+
if af.OutputCollectionID != nil {
863+
protoAttachedFunctions[i].OutputCollectionId = proto.String(*af.OutputCollectionID)
864+
}
865+
}
866+
867+
log.Info("GetSoftDeletedAttachedFunctions: completed successfully",
868+
zap.Int("count", len(attachedFunctions)))
869+
870+
return &coordinatorpb.GetSoftDeletedAttachedFunctionsResponse{
871+
AttachedFunctions: protoAttachedFunctions,
872+
}, nil
873+
}
874+
875+
// FinishAttachedFunctionDeletion permanently deletes an attached function from the database (hard delete)
876+
// This should only be called after the soft delete grace period has passed
877+
func (s *Coordinator) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
878+
log := log.With(zap.String("method", "FinishAttachedFunctionDeletion"))
879+
880+
attachedFunctionID, err := uuid.Parse(req.AttachedFunctionId)
881+
if err != nil {
882+
log.Error("FinishAttachedFunctionDeletion: invalid attached_function_id", zap.Error(err))
883+
return nil, status.Errorf(codes.InvalidArgument, "invalid attached_function_id: %v", err)
884+
}
885+
886+
err = s.catalog.metaDomain.AttachedFunctionDb(ctx).HardDeleteAttachedFunction(attachedFunctionID)
887+
if err != nil {
888+
log.Error("FinishAttachedFunctionDeletion: failed to hard delete attached function", zap.Error(err))
889+
return nil, err
890+
}
891+
892+
log.Info("FinishAttachedFunctionDeletion: completed successfully",
893+
zap.String("attached_function_id", attachedFunctionID.String()))
894+
895+
return &coordinatorpb.FinishAttachedFunctionDeletionResponse{}, nil
896+
}

go/pkg/sysdb/grpc/task_service.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,29 @@ func (s *Server) CleanupExpiredPartialAttachedFunctions(ctx context.Context, req
139139
log.Info("CleanupExpiredPartialAttachedFunctions succeeded", zap.Uint64("cleaned_up_count", res.CleanedUpCount))
140140
return res, nil
141141
}
142+
143+
func (s *Server) GetSoftDeletedAttachedFunctions(ctx context.Context, req *coordinatorpb.GetSoftDeletedAttachedFunctionsRequest) (*coordinatorpb.GetSoftDeletedAttachedFunctionsResponse, error) {
144+
log.Info("GetSoftDeletedAttachedFunctions", zap.Time("cutoff_time", req.CutoffTime.AsTime()), zap.Int32("limit", req.Limit))
145+
146+
res, err := s.coordinator.GetSoftDeletedAttachedFunctions(ctx, req)
147+
if err != nil {
148+
log.Error("GetSoftDeletedAttachedFunctions failed", zap.Error(err))
149+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
150+
}
151+
152+
log.Info("GetSoftDeletedAttachedFunctions succeeded", zap.Int("count", len(res.AttachedFunctions)))
153+
return res, nil
154+
}
155+
156+
func (s *Server) FinishAttachedFunctionDeletion(ctx context.Context, req *coordinatorpb.FinishAttachedFunctionDeletionRequest) (*coordinatorpb.FinishAttachedFunctionDeletionResponse, error) {
157+
log.Info("FinishAttachedFunctionDeletion", zap.String("id", req.AttachedFunctionId))
158+
159+
res, err := s.coordinator.FinishAttachedFunctionDeletion(ctx, req)
160+
if err != nil {
161+
log.Error("FinishAttachedFunctionDeletion failed", zap.Error(err))
162+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
163+
}
164+
165+
log.Info("FinishAttachedFunctionDeletion succeeded", zap.String("id", req.AttachedFunctionId))
166+
return res, nil
167+
}

go/pkg/sysdb/metastore/db/dao/task.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,51 @@ func (s *attachedFunctionDb) CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid
403403

404404
return ids, nil
405405
}
406+
407+
// GetSoftDeletedAttachedFunctions returns attached functions that are soft deleted
408+
// and were updated before the cutoff time (eligible for hard deletion)
409+
func (s *attachedFunctionDb) GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*dbmodel.AttachedFunction, error) {
410+
var attachedFunctions []*dbmodel.AttachedFunction
411+
err := s.db.
412+
Where("is_deleted = ?", true).
413+
Where("updated_at < ?", cutoffTime).
414+
Limit(int(limit)).
415+
Find(&attachedFunctions).Error
416+
417+
if err != nil {
418+
log.Error("GetSoftDeletedAttachedFunctions failed",
419+
zap.Error(err),
420+
zap.Time("cutoff_time", cutoffTime))
421+
return nil, err
422+
}
423+
424+
log.Debug("GetSoftDeletedAttachedFunctions found attached functions",
425+
zap.Int("count", len(attachedFunctions)),
426+
zap.Time("cutoff_time", cutoffTime))
427+
428+
return attachedFunctions, nil
429+
}
430+
431+
// HardDeleteAttachedFunction permanently deletes an attached function from the database
432+
// This should only be called after the soft delete grace period has passed
433+
func (s *attachedFunctionDb) HardDeleteAttachedFunction(id uuid.UUID) error {
434+
result := s.db.Unscoped().Delete(&dbmodel.AttachedFunction{}, "id = ?", id)
435+
436+
if result.Error != nil {
437+
log.Error("HardDeleteAttachedFunction failed",
438+
zap.Error(result.Error),
439+
zap.String("id", id.String()))
440+
return result.Error
441+
}
442+
443+
if result.RowsAffected == 0 {
444+
log.Warn("HardDeleteAttachedFunction: no rows affected (attached function not found)",
445+
zap.String("id", id.String()))
446+
return nil // Idempotent - no error if not found
447+
}
448+
449+
log.Info("HardDeleteAttachedFunction succeeded",
450+
zap.String("id", id.String()))
451+
452+
return nil
453+
}

go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/sysdb/metastore/db/dbmodel/task.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ type IAttachedFunctionDb interface {
5858
PeekScheduleByCollectionId(collectionIDs []string) ([]*AttachedFunction, error)
5959
GetMinCompletionOffsetForCollection(inputCollectionID string) (*int64, error)
6060
CleanupExpiredPartial(maxAgeSeconds uint64) ([]uuid.UUID, error)
61+
GetSoftDeletedAttachedFunctions(cutoffTime time.Time, limit int32) ([]*AttachedFunction, error)
62+
HardDeleteAttachedFunction(id uuid.UUID) error
6163
}

idl/chromadb/proto/coordinator.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,21 @@ message PeekScheduleByCollectionIdResponse {
683683
repeated ScheduleEntry schedule = 1;
684684
}
685685

686+
message GetSoftDeletedAttachedFunctionsRequest {
687+
google.protobuf.Timestamp cutoff_time = 1;
688+
int32 limit = 2;
689+
}
690+
691+
message GetSoftDeletedAttachedFunctionsResponse {
692+
repeated AttachedFunction attached_functions = 1;
693+
}
694+
695+
message FinishAttachedFunctionDeletionRequest {
696+
string attached_function_id = 1;
697+
}
698+
699+
message FinishAttachedFunctionDeletionResponse {}
700+
686701
service SysDB {
687702
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
688703
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
@@ -731,4 +746,6 @@ service SysDB {
731746
rpc CleanupExpiredPartialAttachedFunctions(CleanupExpiredPartialAttachedFunctionsRequest) returns (CleanupExpiredPartialAttachedFunctionsResponse) {}
732747
rpc GetFunctions(GetFunctionsRequest) returns (GetFunctionsResponse) {}
733748
rpc PeekScheduleByCollectionId(PeekScheduleByCollectionIdRequest) returns (PeekScheduleByCollectionIdResponse) {}
749+
rpc GetSoftDeletedAttachedFunctions(GetSoftDeletedAttachedFunctionsRequest) returns (GetSoftDeletedAttachedFunctionsResponse) {}
750+
rpc FinishAttachedFunctionDeletion(FinishAttachedFunctionDeletionRequest) returns (FinishAttachedFunctionDeletionResponse) {}
734751
}

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1974,6 +1974,11 @@ impl ServiceBasedFrontend {
19741974
chroma_sysdb::DeleteAttachedFunctionError::FailedToDeleteAttachedFunction(s) => {
19751975
DetachFunctionError::Internal(Box::new(chroma_error::TonicError(s)))
19761976
}
1977+
chroma_sysdb::DeleteAttachedFunctionError::NotImplemented => {
1978+
DetachFunctionError::Internal(Box::new(chroma_error::TonicError(
1979+
tonic::Status::unimplemented("Not implemented"),
1980+
)))
1981+
}
19771982
})?;
19781983

19791984
Ok(DetachFunctionResponse { success: true })

rust/garbage_collector/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ chroma-index = { workspace = true }
5555
chroma-memberlist = { workspace = true }
5656
chroma-tracing = { workspace = true }
5757
chroma-jemalloc-pprof-server = { workspace = true }
58+
s3heap = { workspace = true }
59+
s3heap-service = { workspace = true }
5860
wal3 = { workspace = true }
5961

6062
[target.'cfg(not(target_env = "msvc"))'.dependencies]

0 commit comments

Comments
 (0)