Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"github.com/chroma-core/chroma/go/pkg/common"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
"github.com/chroma-core/chroma/go/pkg/sysdb/coordinator/model"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbcore"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/chroma-core/chroma/go/shared/otel"
"github.com/google/uuid"
"github.com/pingcap/log"
"go.uber.org/zap"
"gorm.io/gorm"
)

const (
Expand Down Expand Up @@ -566,6 +568,7 @@ func (tc *Catalog) GetCollectionWithSegments(ctx context.Context, collectionID t
collection = collection_entry
return nil
})

if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1678,14 +1681,15 @@ func (tc *Catalog) updateVersionFileInS3(ctx context.Context, versionFilePb *coo
func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectionCompaction *model.FlushCollectionCompaction) (*model.FlushCollectionInfo, error) {
// This is the core path now, since version files are enabled
if tc.versionFileEnabled {
return tc.FlushCollectionCompactionForVersionedCollection(ctx, flushCollectionCompaction)
return tc.FlushCollectionCompactionForVersionedCollection(ctx, flushCollectionCompaction, nil)
}
collectionID := types.FromUniqueID(flushCollectionCompaction.ID)

flushCollectionInfo := &model.FlushCollectionInfo{
ID: flushCollectionCompaction.ID.String(),
}

// Use explicit transaction parameter to ensure both operations run in the same transaction
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// Check if collection exists.
collection, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionWithoutMetadata(collectionID, nil, nil)
Expand Down Expand Up @@ -1751,10 +1755,11 @@ func (tc *Catalog) FlushCollectionCompactionAndAttachedFunction(

var flushCollectionInfo *model.FlushCollectionInfo

// TODO(tanujnay112): This apparently doesn't create a subtransaction. Fix this.
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
var err error
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction)
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
tx := dbcore.GetDB(txCtx)
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction, tx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1846,14 +1851,21 @@ func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVers
// 4. Till the CAS operation succeeds, retry the operation (i.e. goto 1)
// 5. If version CAS fails - then fail the operation to the Compactor.
// 6. If version file name CAS fails - read updated file and write a new version file to S3.
func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.Context, flushCollectionCompaction *model.FlushCollectionCompaction) (*model.FlushCollectionInfo, error) {
func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.Context, flushCollectionCompaction *model.FlushCollectionCompaction, tx *gorm.DB) (*model.FlushCollectionInfo, error) {
// The result that is sent back to the Compactor.
flushCollectionInfo := &model.FlushCollectionInfo{
ID: flushCollectionCompaction.ID.String(),
}

log.Info("FlushCollectionCompaction", zap.String("collection_id", flushCollectionInfo.ID), zap.Int64("log_position", flushCollectionCompaction.LogPosition))

// If a transaction is provided, do a single attempt without retry - any failure should propagate up
// to let the outer transaction fail atomically.
maxAttemptsForThisCall := maxAttempts
if tx != nil {
maxAttemptsForThisCall = 1
}

// Do the operation in a loop until the CollectionEntry is updated,
// OR FAIL the operation if the version is stale
// OR other DB error.
Expand All @@ -1864,7 +1876,7 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
// to mark certain versions and then tries to update the VersionFileName in
// the table at the same time.
numAttempts := 0
for numAttempts < maxAttempts {
for numAttempts < maxAttemptsForThisCall {
numAttempts++
// Get the current version info and the version file from the table.
collectionEntry, segments, err := tc.GetCollectionWithSegments(ctx, flushCollectionCompaction.ID, true)
Expand Down Expand Up @@ -1947,7 +1959,10 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C

numActiveVersions := tc.getNumberOfActiveVersions(existingVersionFilePb)

txErr := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// Execute the database operations - either within provided transaction or new transaction
var txErr error

executeOperations := func(ctx context.Context, tx *gorm.DB) error {
// NOTE: DO NOT move UpdateTenantLastCompactionTime & RegisterFilePaths to the end of the transaction.
// Keep both these operations before the UpdateLogPositionAndVersionInfo.
// UpdateLogPositionAndVersionInfo acts as a CAS operation whose failure will roll back the transaction.
Expand All @@ -1956,16 +1971,21 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
// The other approach is to use a "SELECT FOR UPDATE" to lock the Collection entry at the start of the transaction,
// which is costlier than the current approach that does not lock the Collection entry.

// Create context with transaction if provided
if tx != nil {
ctx = dbcore.CtxWithTransaction(ctx, tx)
}

// register files to Segment metadata
err = tc.metaDomain.SegmentDb(txCtx).RegisterFilePaths(flushCollectionCompaction.FlushSegmentCompactions)
err := tc.metaDomain.SegmentDb(ctx).RegisterFilePaths(flushCollectionCompaction.FlushSegmentCompactions)
if err != nil {
return err
}
// update tenant last compaction time
// TODO: add a system configuration to disable
// since this might cause resource contention if one tenant has a lot of collection compactions at the same time
lastCompactionTime := time.Now().Unix()
err = tc.metaDomain.TenantDb(txCtx).UpdateTenantLastCompactionTime(flushCollectionCompaction.TenantID, lastCompactionTime)
err = tc.metaDomain.TenantDb(ctx).UpdateTenantLastCompactionTime(flushCollectionCompaction.TenantID, lastCompactionTime)
if err != nil {
return err
}
Expand All @@ -1978,7 +1998,7 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
// if the Collection entry is updated by another Tx.

// Update collection log position and version
rowsAffected, err := tc.metaDomain.CollectionDb(txCtx).UpdateLogPositionAndVersionInfo(
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateLogPositionAndVersionInfo(
flushCollectionCompaction.ID.String(),
flushCollectionCompaction.LogPosition,
flushCollectionCompaction.CurrentCollectionVersion,
Expand Down Expand Up @@ -2009,9 +2029,20 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
flushCollectionInfo.TenantLastCompactionTime = lastCompactionTime
flushCollectionInfo.CollectionVersion = flushCollectionCompaction.CurrentCollectionVersion + 1

// return nil will commit the transaction
// Success
return nil
}) // End of transaction
}

// Check if a transaction was provided - if so, use it directly instead of creating nested transaction
if tx != nil {
// Use provided transaction directly - no nested transaction
txErr = executeOperations(ctx, tx)
} else {
// Create new transaction
txErr = tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
return executeOperations(txCtx, nil)
})
}

if txErr == nil {
// CAS operation succeeded.
Expand Down
1 change: 0 additions & 1 deletion go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ func attachedFunctionToProto(attachedFunction *dbmodel.AttachedFunction, functio
TenantId: attachedFunction.TenantID,
DatabaseId: attachedFunction.DatabaseID,
NextRunAt: uint64(attachedFunction.NextRun.UnixMicro()),
LowestLiveNonce: nil,
NextNonce: attachedFunction.NextNonce.String(),
CreatedAt: uint64(attachedFunction.CreatedAt.UnixMicro()),
UpdatedAt: uint64(attachedFunction.UpdatedAt.UnixMicro()),
Expand Down
Loading