Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bf5f311
Extract Cursor class and update javadoc style
blambov Mar 13, 2025
e7f4f00
Change transformations to implement Cursor
blambov Mar 14, 2025
09a8b56
Adds the ability to verify cursors' behaviour for debugging
blambov Mar 14, 2025
635feaa
Put direction argument first in forEach/process
blambov Mar 18, 2025
7e57c12
Extract BaseTrie
blambov Mar 18, 2025
31ea5db
Add concrete type to BaseTrie
blambov Mar 18, 2025
a70a2d2
Add CursorWalkable interface to BaseTrie and move implementations there
blambov Mar 18, 2025
125b325
Run trie tests with verification by default
blambov Apr 4, 2025
3275662
Fix prefixed and singleton tailCursor
blambov Jun 2, 2025
0579577
Implement TrieSet and change slices to use intersection
blambov Mar 19, 2025
4735cde
Extract InMemoryBaseTrie unchanged in preparation for other trie types
blambov Apr 29, 2025
871f926
Add deletion support for InMemoryTrie
blambov Apr 29, 2025
e9b8ce1
Add RangeTrie
blambov Mar 24, 2025
ce9d384
Implement RangeTrie.applyTo, InMemoryTrie.delete and InMemoryTrie.app…
blambov May 5, 2025
ecd1f10
Add DeletionAwareTrie
blambov May 16, 2025
8069242
Add "Stage2" versions of trie memtable and partition classes
blambov Jul 15, 2025
83394bb
TrieMemtable Stage 3
blambov Jul 18, 2025
45bc6b1
Implement, test and benchmark stopIssuingTombstones
blambov Sep 4, 2025
855f587
Add trie slicing support for SAI uses
blambov Sep 25, 2025
b0519d6
Switch row deletions to point tombstones
blambov Sep 26, 2025
33b461e
Generalize forEachValue/Entry
blambov Oct 2, 2025
d59d5ec
Switch MemtableAverageRowSize to use trie directly and expand test
blambov Oct 2, 2025
21b0d30
Remove TrieSetIntersectionCursor and implement union and intersection…
blambov Nov 5, 2025
3177f9c
Move TrieSetNegatedCursor into TrieSetCursor
blambov Nov 5, 2025
9b0c620
Review changes
blambov Nov 6, 2025
993194d
Add graph for in-memory trie deletions
blambov Nov 6, 2025
d6902ca
Review changes
blambov Nov 6, 2025
64b5291
Test points in range tries
blambov Nov 6, 2025
94803f4
Review changes
blambov Nov 6, 2025
bbf34bd
Include head in applyToSelected calls
blambov Nov 6, 2025
b072af1
Add deletion-aware tails test and fix problems
blambov Nov 10, 2025
058afcb
Change deletion-aware collection merge to make independently tracking…
blambov Nov 10, 2025
239cb36
Review changes
blambov Nov 11, 2025
e825fde
Sonarcloud and idea warnings
blambov Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 108 additions & 111 deletions src/java/org/apache/cassandra/db/memtable/TrieMemtable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public class TrieMemtableStage1 extends AbstractAllocatorMemtable
TrieMemtableStage1(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner)
{
super(commitLogLowerBound, metadataRef, owner);
this.boundaries = owner.localRangeSplits(TrieMemtable.SHARD_COUNT);
this.boundaries = owner.localRangeSplits(TrieMemtable.shardCount());
this.metrics = TrieMemtableMetricsView.getOrCreate(metadataRef.keyspace, metadataRef.name);
this.shards = generatePartitionShards(boundaries.shardCount(), metadataRef, metrics);
this.mergedTrie = makeMergedTrie(shards);
Expand Down
69 changes: 4 additions & 65 deletions src/java/org/apache/cassandra/db/memtable/TrieMemtableStage2.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.tries.Direction;
import org.apache.cassandra.db.tries.InMemoryBaseTrie;
import org.apache.cassandra.db.tries.InMemoryTrie;
import org.apache.cassandra.db.tries.Trie;
import org.apache.cassandra.db.tries.TrieEntriesWalker;
Expand All @@ -67,8 +68,6 @@
import org.apache.cassandra.metrics.TrieMemtableMetricsView;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
Expand Down Expand Up @@ -120,15 +119,13 @@ public class TrieMemtableStage2 extends AbstractAllocatorMemtable
default:
throw new AssertionError();
}

MBeanWrapper.instance.registerMBean(new TrieMemtableConfig(), TRIE_MEMTABLE_CONFIG_OBJECT_NAME, MBeanWrapper.OnException.LOG);
}

/**
* Force copy checker (see InMemoryTrie.ApplyState) ensuring all modifications apply atomically and consistently to
* the whole partition.
*/
public static final Predicate<InMemoryTrie.NodeFeatures<Object>> FORCE_COPY_PARTITION_BOUNDARY = features -> isPartitionBoundary(features.content());
public static final Predicate<InMemoryBaseTrie.NodeFeatures<Object>> FORCE_COPY_PARTITION_BOUNDARY = features -> isPartitionBoundary(features.content());

public static final Predicate<Object> IS_PARTITION_BOUNDARY = TrieMemtableStage2::isPartitionBoundary;

Expand Down Expand Up @@ -167,19 +164,11 @@ public class TrieMemtableStage2 extends AbstractAllocatorMemtable
*/
private volatile MemtableAverageRowSize estimatedAverageRowSize;

public static volatile int SHARD_COUNT = CassandraRelevantProperties.TRIE_MEMTABLE_SHARD_COUNT.getInt(autoShardCount());
public static volatile boolean SHARD_LOCK_FAIRNESS = CassandraRelevantProperties.TRIE_MEMTABLE_SHARD_LOCK_FAIRNESS.getBoolean();

private static int autoShardCount()
{
return 4 * FBUtilities.getAvailableProcessors();
}

// only to be used by init(), to setup the very first memtable for the cfs
TrieMemtableStage2(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner)
{
super(commitLogLowerBound, metadataRef, owner);
this.boundaries = owner.localRangeSplits(SHARD_COUNT);
this.boundaries = owner.localRangeSplits(TrieMemtable.shardCount());
this.metrics = TrieMemtableMetricsView.getOrCreate(metadataRef.keyspace, metadataRef.name);
this.shards = generatePartitionShards(boundaries.shardCount(), metadataRef, metrics, owner.readOrdering());
this.mergedTrie = makeMergedTrie(shards);
Expand Down Expand Up @@ -610,7 +599,7 @@ public static class MemtableShard
private volatile int partitionCount = 0;

@Unmetered
private ReentrantLock writeLock = new ReentrantLock(SHARD_LOCK_FAIRNESS);
private ReentrantLock writeLock = new ReentrantLock(TrieMemtable.shardLockFairness());

// Content map for the given shard. This is implemented as a memtable trie which uses the prefix-free
// byte-comparable ByteSource representations of the keys to address the partitions.
Expand Down Expand Up @@ -891,54 +880,4 @@ void releaseReferencesUnsafe()
for (MemtableShard shard : shards)
shard.data.releaseReferencesUnsafe();
}

@VisibleForTesting
public static class TrieMemtableConfig implements TrieMemtableConfigMXBean
{
@Override
public void setShardCount(String shardCount)
{
if ("auto".equalsIgnoreCase(shardCount))
{
SHARD_COUNT = autoShardCount();
CassandraRelevantProperties.TRIE_MEMTABLE_SHARD_COUNT.setInt(SHARD_COUNT);
}
else
{
try
{
SHARD_COUNT = Integer.valueOf(shardCount);
CassandraRelevantProperties.TRIE_MEMTABLE_SHARD_COUNT.setInt(SHARD_COUNT);
}
catch (NumberFormatException ex)
{
logger.warn("Unable to parse {} as valid value for shard count; leaving it as {}",
shardCount, SHARD_COUNT);
return;
}
}
logger.info("Requested setting shard count to {}; set to: {}", shardCount, SHARD_COUNT);
}

@Override
public String getShardCount()
{
return "" + SHARD_COUNT;
}

@Override
public void setLockFairness(String fairness)
{
SHARD_LOCK_FAIRNESS = Boolean.parseBoolean(fairness);
CassandraRelevantProperties.TRIE_MEMTABLE_SHARD_LOCK_FAIRNESS.setBoolean(SHARD_LOCK_FAIRNESS);
logger.info("Requested setting shard lock fairness to {}; set to: {}", fairness, SHARD_LOCK_FAIRNESS);
}

@Override
public String getLockFairness()
{
return "" + SHARD_LOCK_FAIRNESS;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.partitions;

import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.RangeTombstoneBoundaryMarker;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.WrappingUnfilteredRowIterator;

/// An [UnfilteredRowIterator] that recombines sequences of range tombstones for the same key into deleted rows.
///
/// The objective of this class is to reverse the transformation made by [TriePartitionUpdate] that implements row
/// deletions as pairs of tombstones around the row (which are then placed in the deletion branch of the trie).
/// This transformation is valid, but a lot of tests rely on row deletions being represented by empty deleted rows.
/// For the time being we thus do the reverse transformation on conversion from trie to unfiltered iterator.
class RecombiningUnfilteredRowIterator extends WrappingUnfilteredRowIterator
{
Unfiltered bufferedOne;
Unfiltered bufferedTwo;
Unfiltered next;
boolean nextPrepared;

protected RecombiningUnfilteredRowIterator(UnfilteredRowIterator wrapped)
{
super(wrapped);
bufferedOne = null;
nextPrepared = false;
}

@Override
public boolean hasNext()
{
return computeNext() != null;
}

@Override
public Unfiltered next()
{
Unfiltered item = computeNext();
nextPrepared = false;
return item;
}

private Unfiltered computeNext()
{
if (nextPrepared)
return next;

// If we have two buffered entries, report the first one directly (there's no need to process it as we already
// know it is a row) and shift the second one to the first position.
if (bufferedTwo != null)
{
Unfiltered unfiltered2 = bufferedTwo;
bufferedTwo = null;
return setNextAndBufferedAndReturn(bufferedOne, unfiltered2);
}

// If we have a buffered entry, use it for the following processing, otherwise get one from the source.
Unfiltered unfiltered1;
if (bufferedOne != null)
{
unfiltered1 = bufferedOne;
bufferedOne = null;
}
else
{
if (!wrapped.hasNext())
return setNextAndReturn(null);

unfiltered1 = wrapped.next();
}

// The pattern we are looking for is
// open_incusive(clustering, del) + row(clustering) + close_inclusive(clustering, del)
// where the row is optional

if (unfiltered1.isRow())
return setNextAndReturn(unfiltered1);

RangeTombstoneMarker marker1 = (RangeTombstoneMarker) unfiltered1;
boolean reversed = isReverseOrder();
int clusteringSize = metadata().comparator.size();
// The first marker must be open, inclusive, and a fully specified clustering.
if (!marker1.isOpen(reversed)
|| !marker1.openIsInclusive(reversed)
|| marker1.clustering().size() != clusteringSize
|| (clusteringSize > 0 && marker1.clustering().get(clusteringSize - 1) == null))
return setNextAndReturn(marker1);

if (!wrapped.hasNext())
return setNextAndReturn(marker1);

Unfiltered unfiltered2 = wrapped.next();
final DeletionTime deletionTime = marker1.openDeletionTime(reversed);
if (unfiltered2.isRangeTombstoneMarker())
{
RangeTombstoneMarker marker2 = (RangeTombstoneMarker) unfiltered2;
assert marker2.isClose(reversed);
assert marker2.closeDeletionTime(reversed).equals(deletionTime);
if (!marker2.closeIsInclusive(reversed) || !clusteringPositionsEqual(marker1, marker2))
return setNextAndBufferedAndReturn(marker1, marker2);

// The recombination applies. We have to transform the open side of marker1 and the close side
// of marker2 into an empty row with deletion time.
return processOtherSidesAndReturn(BTreeRow.emptyDeletedRow(clusteringPositionOf(marker1), Row.Deletion.regular(deletionTime)),
reversed, marker1, marker2, deletionTime);
}

BTreeRow row2 = (BTreeRow) unfiltered2;

if (!clusteringPositionsEqual(marker1, row2))
return setNextAndBufferedAndReturn(marker1, row2);

if (!wrapped.hasNext())
return setNextAndBufferedAndReturn(marker1, row2);

Unfiltered unfiltered3 = wrapped.next();
if (unfiltered3.isRow())
return setNextAndBufferedAndReturn(marker1, row2, unfiltered3);

RangeTombstoneMarker marker3 = (RangeTombstoneMarker) unfiltered3;
assert marker3.isClose(reversed);
assert marker3.closeDeletionTime(reversed).equals(deletionTime);
if (!marker3.closeIsInclusive(reversed) || !clusteringPositionsEqual(marker1, marker3))
return setNextAndBufferedAndReturn(marker1, row2, marker3);

// The recombination applies. We have to transform the open side of marker1 and the close side
// of marker3 into a deletion time for row2.
return processOtherSidesAndReturn(BTreeRow.create(row2.clustering(), row2.primaryKeyLivenessInfo(), Row.Deletion.regular(deletionTime), row2.getBTree()),
reversed, marker1, marker3, deletionTime);
}

private Unfiltered processOtherSidesAndReturn(Row row,
boolean reversed,
RangeTombstoneMarker markerLeft,
RangeTombstoneMarker markerRight,
DeletionTime deletionTime)
{
// Check if any of the markers is a boundary, and if so, report the other side.
if (!markerLeft.isClose(reversed))
{
if (!markerRight.isOpen(reversed))
return setNextAndReturn(row);

return setNextAndBufferedAndReturn(row,
((RangeTombstoneBoundaryMarker) markerRight).createCorrespondingOpenMarker(reversed));
}

if (!markerRight.isOpen(reversed))
return setNextAndBufferedAndReturn(((RangeTombstoneBoundaryMarker) markerLeft).createCorrespondingCloseMarker(reversed),
row);

// We have surviving markers on both sides.
final DeletionTime closeDeletionTime = markerLeft.closeDeletionTime(reversed);
if (markerRight.openDeletionTime(reversed).equals(closeDeletionTime) && !closeDeletionTime.supersedes(deletionTime))
{
// The row interrupts a covering deletion, we can still drop both markers and report a deleted row.
return setNextAndReturn(row);
}

return setNextAndBufferedAndReturn(((RangeTombstoneBoundaryMarker) markerLeft).createCorrespondingCloseMarker(reversed),
row,
((RangeTombstoneBoundaryMarker) markerRight).createCorrespondingOpenMarker(reversed));
}

private Unfiltered setNextAndReturn(Unfiltered next)
{
this.next = next;
this.nextPrepared = true;
return next;
}

private Unfiltered setNextAndBufferedAndReturn(Unfiltered next, Unfiltered bufferedOne)
{
this.bufferedOne = bufferedOne;
return setNextAndReturn(next);
}

private Unfiltered setNextAndBufferedAndReturn(Unfiltered next, Row bufferedOne, Unfiltered bufferedTwo)
{
this.bufferedTwo = bufferedTwo;
return setNextAndBufferedAndReturn(next, bufferedOne);
}

static boolean clusteringPositionsEqual(Unfiltered l, Unfiltered r)
{
return clusteringPositionsEqual(l.clustering(), r.clustering());
}

static <L, R> boolean clusteringPositionsEqual(ClusteringPrefix<L> cl, ClusteringPrefix<R> cr)
{
if (cl.size() != cr.size())
return false;
for (int i = cl.size() - 1; i >= 0; --i)
if (cl.accessor().compare(cl.get(i), cr.get(i), cr.accessor()) != 0)
return false;
return true;
}

static Clustering<?> clusteringPositionOf(Unfiltered unfiltered)
{
return clusteringPositionOf(unfiltered.clustering());
}

static <V> Clustering<V> clusteringPositionOf(ClusteringPrefix<V> prefix)
{
return prefix.accessor().factory().clustering(prefix.getRawValues());
}
}
Loading