Skip to content

Commit a3788d7

Browse files
committed
[BUG]: Make FlushCollectionCompactionAndAttachedCollection transactional
1 parent f0e1560 commit a3788d7

File tree

1 file changed

+43
-14
lines changed

1 file changed

+43
-14
lines changed

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ import (
1010
"github.com/chroma-core/chroma/go/pkg/common"
1111
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
1212
"github.com/chroma-core/chroma/go/pkg/sysdb/coordinator/model"
13+
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbcore"
1314
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
1415
s3metastore "github.com/chroma-core/chroma/go/pkg/sysdb/metastore/s3"
1516
"github.com/chroma-core/chroma/go/pkg/types"
1617
"github.com/chroma-core/chroma/go/shared/otel"
1718
"github.com/google/uuid"
1819
"github.com/pingcap/log"
1920
"go.uber.org/zap"
21+
"gorm.io/gorm"
2022
)
2123

2224
const (
@@ -566,6 +568,7 @@ func (tc *Catalog) GetCollectionWithSegments(ctx context.Context, collectionID t
566568
collection = collection_entry
567569
return nil
568570
})
571+
569572
if err != nil {
570573
return nil, nil, err
571574
}
@@ -1678,14 +1681,13 @@ func (tc *Catalog) updateVersionFileInS3(ctx context.Context, versionFilePb *coo
16781681
func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectionCompaction *model.FlushCollectionCompaction) (*model.FlushCollectionInfo, error) {
16791682
// This is the core path now, since version files are enabled
16801683
if tc.versionFileEnabled {
1681-
return tc.FlushCollectionCompactionForVersionedCollection(ctx, flushCollectionCompaction)
1684+
return tc.FlushCollectionCompactionForVersionedCollection(ctx, flushCollectionCompaction, nil)
16821685
}
16831686
collectionID := types.FromUniqueID(flushCollectionCompaction.ID)
16841687

1685-
flushCollectionInfo := &model.FlushCollectionInfo{
1686-
ID: flushCollectionCompaction.ID.String(),
1687-
}
1688+
var flushCollectionInfo *model.FlushCollectionInfo
16881689

1690+
// Use explicit transaction parameter to ensure both operations run in the same transaction
16891691
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
16901692
// Check if collection exists.
16911693
collection, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionWithoutMetadata(collectionID, nil, nil)
@@ -1751,10 +1753,11 @@ func (tc *Catalog) FlushCollectionCompactionAndAttachedFunction(
17511753

17521754
var flushCollectionInfo *model.FlushCollectionInfo
17531755

1754-
// TODO(tanujnay112): This apparently doesn't create a subtransaction. Fix this.
17551756
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
17561757
var err error
1757-
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction)
1758+
// Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1759+
tx := dbcore.GetDB(txCtx)
1760+
flushCollectionInfo, err = tc.FlushCollectionCompactionForVersionedCollection(txCtx, flushCollectionCompaction, tx)
17581761
if err != nil {
17591762
return err
17601763
}
@@ -1846,14 +1849,21 @@ func (tc *Catalog) validateVersionFile(versionFile *coordinatorpb.CollectionVers
18461849
// 4. Till the CAS operation succeeds, retry the operation (i.e. goto 1)
18471850
// 5. If version CAS fails - then fail the operation to the Compactor.
18481851
// 6. If version file name CAS fails - read updated file and write a new version file to S3.
1849-
func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.Context, flushCollectionCompaction *model.FlushCollectionCompaction) (*model.FlushCollectionInfo, error) {
1852+
func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.Context, flushCollectionCompaction *model.FlushCollectionCompaction, tx *gorm.DB) (*model.FlushCollectionInfo, error) {
18501853
// The result that is sent back to the Compactor.
18511854
flushCollectionInfo := &model.FlushCollectionInfo{
18521855
ID: flushCollectionCompaction.ID.String(),
18531856
}
18541857

18551858
log.Info("FlushCollectionCompaction", zap.String("collection_id", flushCollectionInfo.ID), zap.Int64("log_position", flushCollectionCompaction.LogPosition))
18561859

1860+
// If a transaction is provided, do a single attempt without retry - any failure should propagate up
1861+
// to let the outer transaction fail atomically.
1862+
maxAttemptsForThisCall := maxAttempts
1863+
if tx != nil {
1864+
maxAttemptsForThisCall = 1
1865+
}
1866+
18571867
// Do the operation in a loop until the CollectionEntry is updated,
18581868
// OR FAIL the operation if the version is stale
18591869
// OR other DB error.
@@ -1864,7 +1874,7 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
18641874
// to mark certain versions and then tries to update the VersionFileName in
18651875
// the table at the same time.
18661876
numAttempts := 0
1867-
for numAttempts < maxAttempts {
1877+
for numAttempts < maxAttemptsForThisCall {
18681878
numAttempts++
18691879
// Get the current version info and the version file from the table.
18701880
collectionEntry, segments, err := tc.GetCollectionWithSegments(ctx, flushCollectionCompaction.ID, true)
@@ -1947,7 +1957,10 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
19471957

19481958
numActiveVersions := tc.getNumberOfActiveVersions(existingVersionFilePb)
19491959

1950-
txErr := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
1960+
// Execute the database operations - either within provided transaction or new transaction
1961+
var txErr error
1962+
1963+
executeOperations := func(ctx context.Context, tx *gorm.DB) error {
19511964
// NOTE: DO NOT move UpdateTenantLastCompactionTime & RegisterFilePaths to the end of the transaction.
19521965
// Keep both these operations before the UpdateLogPositionAndVersionInfo.
19531966
// UpdateLogPositionAndVersionInfo acts as a CAS operation whose failure will roll back the transaction.
@@ -1956,16 +1969,21 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
19561969
// The other approach is to use a "SELECT FOR UPDATE" to lock the Collection entry at the start of the transaction,
19571970
// which is costlier than the current approach that does not lock the Collection entry.
19581971

1972+
// Create context with transaction if provided
1973+
if tx != nil {
1974+
ctx = dbcore.CtxWithTransaction(ctx, tx)
1975+
}
1976+
19591977
// register files to Segment metadata
1960-
err = tc.metaDomain.SegmentDb(txCtx).RegisterFilePaths(flushCollectionCompaction.FlushSegmentCompactions)
1978+
err := tc.metaDomain.SegmentDb(ctx).RegisterFilePaths(flushCollectionCompaction.FlushSegmentCompactions)
19611979
if err != nil {
19621980
return err
19631981
}
19641982
// update tenant last compaction time
19651983
// TODO: add a system configuration to disable
19661984
// since this might cause resource contention if one tenant has a lot of collection compactions at the same time
19671985
lastCompactionTime := time.Now().Unix()
1968-
err = tc.metaDomain.TenantDb(txCtx).UpdateTenantLastCompactionTime(flushCollectionCompaction.TenantID, lastCompactionTime)
1986+
err = tc.metaDomain.TenantDb(ctx).UpdateTenantLastCompactionTime(flushCollectionCompaction.TenantID, lastCompactionTime)
19691987
if err != nil {
19701988
return err
19711989
}
@@ -1978,7 +1996,7 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
19781996
// if the Collection entry is updated by another Tx.
19791997

19801998
// Update collection log position and version
1981-
rowsAffected, err := tc.metaDomain.CollectionDb(txCtx).UpdateLogPositionAndVersionInfo(
1999+
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateLogPositionAndVersionInfo(
19822000
flushCollectionCompaction.ID.String(),
19832001
flushCollectionCompaction.LogPosition,
19842002
flushCollectionCompaction.CurrentCollectionVersion,
@@ -2009,9 +2027,20 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
20092027
flushCollectionInfo.TenantLastCompactionTime = lastCompactionTime
20102028
flushCollectionInfo.CollectionVersion = flushCollectionCompaction.CurrentCollectionVersion + 1
20112029

2012-
// return nil will commit the transaction
2030+
// Success
20132031
return nil
2014-
}) // End of transaction
2032+
}
2033+
2034+
// Check if a transaction was provided - if so, use it directly instead of creating nested transaction
2035+
if tx != nil {
2036+
// Use provided transaction directly - no nested transaction
2037+
txErr = executeOperations(ctx, tx)
2038+
} else {
2039+
// Create new transaction
2040+
txErr = tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
2041+
return executeOperations(txCtx, nil)
2042+
})
2043+
}
20152044

20162045
if txErr == nil {
20172046
// CAS operation succeeded.

0 commit comments

Comments
 (0)