Skip to content

Commit 41247e1

Browse files
committed
WIP: Point row deletions
1 parent 2c0ece3 commit 41247e1

File tree

7 files changed

+246
-17
lines changed

7 files changed

+246
-17
lines changed

src/java/org/apache/cassandra/db/partitions/TrieBackedPartition.java

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,16 +318,17 @@ protected static void putInTrie(ClusteringComparator comparator, InMemoryDeletio
318318
Clustering<?> clustering = row.clustering();
319319
DeletionTime deletionTime = row.deletion().time();
320320

321+
ByteComparable comparableClustering = comparator.asByteComparable(clustering);
321322
if (!deletionTime.isLive())
322323
{
323324
putDeletionInTrie(trie,
324-
comparator.asByteComparable(clustering.asStartBound()),
325-
comparator.asByteComparable(clustering.asEndBound()),
325+
comparableClustering,
326+
comparableClustering,
326327
deletionTime);
327328
}
328329
if (!row.isEmptyAfterDeletion())
329330
{
330-
trie.apply(DeletionAwareTrie.<Object, TrieTombstoneMarker>singleton(comparator.asByteComparable(clustering),
331+
trie.apply(DeletionAwareTrie.<Object, TrieTombstoneMarker>singleton(comparableClustering,
331332
BYTE_COMPARABLE_VERSION,
332333
rowToData(row)),
333334
noConflictInData(),
@@ -499,6 +500,19 @@ public UnfilteredRowIterator unfilteredIterator()
499500
return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
500501
}
501502

503+
public static Object combineDataAndDeletion(Object data, TrieTombstoneMarker deletion)
504+
{
505+
if (data == null || data instanceof PartitionMarker)
506+
return deletion;
507+
508+
if (deletion == null || !deletion.hasPointData())
509+
return data;
510+
511+
// This is a row combined with a point deletion.
512+
RowData rowData = (RowData) data;
513+
return rowData.toRow(Clustering.EMPTY, deletion.deletionTime());
514+
}
515+
502516
/// Implementation of [UnfilteredRowIterator] for this partition.
503517
///
504518
/// Currently, this implementation is pretty involved because it has to revert the transformations done to row and
@@ -521,7 +535,8 @@ protected UnfilteredIterator(ColumnFilter selection, DeletionAwareTrie<Object, T
521535

522536
private UnfilteredIterator(ColumnFilter selection, DeletionAwareTrie<Object, TrieTombstoneMarker> trie, boolean reversed, DeletionTime partitionLevelDeletion)
523537
{
524-
super(trie.mergedTrieSwitchable((x, y) -> x instanceof RowData ? x : y), Direction.fromBoolean(reversed));
538+
super(trie.mergedTrieSwitchable(TrieBackedPartition::combineDataAndDeletion),
539+
Direction.fromBoolean(reversed));
525540
this.trie = trie;
526541
this.selection = selection;
527542
this.reversed = reversed;
@@ -533,11 +548,23 @@ protected Unfiltered mapContent(Object content, byte[] bytes, int byteLength)
533548
{
534549
if (content instanceof RowData)
535550
return toRow((RowData) content,
536-
metadata.comparator.clusteringFromByteComparable(ByteBufferAccessor.instance,
537-
ByteComparable.preencoded(BYTE_COMPARABLE_VERSION,
538-
bytes, 0, byteLength),
539-
BYTE_COMPARABLE_VERSION)) // deletion is given as range tombstone
540-
.filter(selection, metadata());
551+
getClustering(bytes, byteLength)) // deletion is given as range tombstone
552+
.filter(selection, metadata());
553+
if (content instanceof Row)
554+
{
555+
BTreeRow row = (BTreeRow) content;
556+
return BTreeRow.create(getClustering(bytes, byteLength),
557+
row.primaryKeyLivenessInfo(),
558+
row.deletion(),
559+
row.getBTree(),
560+
row.getMinLocalDeletionTime())
561+
.filter(selection, metadata());
562+
}
563+
564+
TrieTombstoneMarker marker = (TrieTombstoneMarker) content;
565+
if (marker.hasPointData())
566+
return BTreeRow.emptyDeletedRow(getClustering(bytes, byteLength),
567+
Row.Deletion.regular(marker.deletionTime()));
541568
else
542569
return ((TrieTombstoneMarker) content).toRangeTombstoneMarker(
543570
ByteComparable.preencoded(BYTE_COMPARABLE_VERSION, bytes, 0, byteLength),
@@ -546,6 +573,14 @@ protected Unfiltered mapContent(Object content, byte[] bytes, int byteLength)
546573
partitionLevelDeletion);
547574
}
548575

576+
private Clustering<?> getClustering(byte[] bytes, int byteLength)
577+
{
578+
return metadata.comparator.clusteringFromByteComparable(ByteBufferAccessor.instance,
579+
ByteComparable.preencoded(BYTE_COMPARABLE_VERSION,
580+
bytes, 0, byteLength),
581+
BYTE_COMPARABLE_VERSION);
582+
}
583+
549584
@Override
550585
public DeletionTime partitionLevelDeletion()
551586
{
@@ -612,7 +647,7 @@ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, ByteComp
612647
return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow(), partitionLevelDeletion(), reversed);
613648

614649
DeletionAwareTrie<Object, TrieTombstoneMarker> slicedTrie = trie.intersect(TrieSet.ranges(BYTE_COMPARABLE_VERSION, bounds));
615-
return new RecombiningUnfilteredRowIterator(new UnfilteredIterator(selection, slicedTrie, reversed));
650+
return new UnfilteredIterator(selection, slicedTrie, reversed);
616651
}
617652

618653
public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)

src/java/org/apache/cassandra/db/partitions/TriePartitionUpdate.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,15 +440,16 @@ public void add(Row row)
440440
Clustering<?> clustering = row.clustering();
441441
DeletionTime deletionTime = row.deletion().time();
442442

443+
ByteComparable comparableClustering = metadata.comparator.asByteComparable(clustering);
443444
if (!deletionTime.isLive())
444445
{
445-
putDeletionInTrie(metadata.comparator.asByteComparable(clustering.asStartBound()),
446-
metadata.comparator.asByteComparable(clustering.asEndBound()),
446+
putDeletionInTrie(comparableClustering,
447+
comparableClustering,
447448
deletionTime);
448449
}
449450
if (!row.isEmptyAfterDeletion())
450451
{
451-
trie.apply(DeletionAwareTrie.<Row, TrieTombstoneMarker>singleton(metadata.comparator.asByteComparable(clustering),
452+
trie.apply(DeletionAwareTrie.<Row, TrieTombstoneMarker>singleton(comparableClustering,
452453
BYTE_COMPARABLE_VERSION,
453454
row),
454455
this::merge,

src/java/org/apache/cassandra/db/rows/TrieTombstoneMarker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ RangeTombstoneMarker toRangeTombstoneMarker(ByteComparable clusteringPrefixAsByt
5353
/// equal) which is not stored or reported.
5454
TrieTombstoneMarker mergeWith(TrieTombstoneMarker existing);
5555

56+
boolean hasPointData();
57+
5658
static TrieTombstoneMarker covering(DeletionTime deletionTime)
5759
{
5860
return TrieTombstoneMarkerImpl.covering(deletionTime);

src/java/org/apache/cassandra/db/rows/TrieTombstoneMarkerImpl.java

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,19 @@ public Covering rightDeletion()
9797
return this;
9898
}
9999

100+
@Override
101+
public boolean hasPointData()
102+
{
103+
return false;
104+
}
105+
100106
@Override
101107
public TrieTombstoneMarker mergeWith(TrieTombstoneMarker other)
102108
{
103109
if (other instanceof Boundary)
104110
return other.mergeWith(this);
111+
if (other instanceof Point)
112+
return other.mergeWith(this);
105113

106114
return combine(this, (Covering) other);
107115
}
@@ -136,6 +144,12 @@ public TrieTombstoneMarker asBoundary(Direction direction)
136144
return direction.isForward() ? new Boundary(null, this) : new Boundary(this, null);
137145
}
138146

147+
@Override
148+
public TrieTombstoneMarker asPoint()
149+
{
150+
return new Point(this, null);
151+
}
152+
139153
@Override
140154
public DeletionTime deletionTime()
141155
{
@@ -168,6 +182,12 @@ public DeletionTime deletionTime()
168182
: leftDeletion;
169183
}
170184

185+
@Override
186+
public boolean hasPointData()
187+
{
188+
return false;
189+
}
190+
171191
@Override
172192
public RangeTombstoneMarker toRangeTombstoneMarker(ByteComparable clusteringPrefixAsByteComparable,
173193
ByteComparable.Version byteComparableVersion,
@@ -204,6 +224,7 @@ public TrieTombstoneMarker mergeWith(TrieTombstoneMarker existing)
204224
if (existing == null)
205225
return this;
206226

227+
assert !existing.hasPointData() : "Boundary cannot be merged with point deletion";
207228
TrieTombstoneMarkerImpl other = (TrieTombstoneMarkerImpl) existing;
208229
Covering otherLeft = other.leftDeletion();
209230
Covering newLeft = combine(leftDeletion, otherLeft);
@@ -273,4 +294,126 @@ public String toString()
273294
return (leftDeletion != null ? leftDeletion : "LIVE") + " -> " + (rightDeletion != null ? rightDeletion : "LIVE");
274295
}
275296
}
297+
298+
static class Point implements TrieTombstoneMarkerImpl
299+
{
300+
final @Nullable Covering coveringDeletion;
301+
final Covering pointDeletion;
302+
303+
public Point(Covering pointDeletion, @Nullable Covering coveringDeletion)
304+
{
305+
this.coveringDeletion = coveringDeletion;
306+
this.pointDeletion = pointDeletion;
307+
}
308+
309+
@Override
310+
public Covering leftDeletion()
311+
{
312+
return coveringDeletion;
313+
}
314+
315+
@Override
316+
public Covering rightDeletion()
317+
{
318+
return coveringDeletion;
319+
}
320+
321+
@Override
322+
public DeletionTime deletionTime()
323+
{
324+
return pointDeletion;
325+
}
326+
327+
@Override
328+
public RangeTombstoneMarker toRangeTombstoneMarker(ByteComparable clusteringPrefixAsByteComparable,
329+
ByteComparable.Version byteComparableVersion,
330+
ClusteringComparator comparator,
331+
DeletionTime deletionToOmit)
332+
{
333+
return null;
334+
}
335+
336+
@Override
337+
public TrieTombstoneMarker mergeWith(TrieTombstoneMarker existing)
338+
{
339+
if (existing == null)
340+
return this;
341+
342+
if (existing instanceof Covering)
343+
{
344+
Covering existingCovering = (Covering) existing;
345+
if (!pointDeletion.supersedes(existingCovering))
346+
{
347+
if (coveringDeletion == null || !coveringDeletion.supersedes(existingCovering))
348+
return null;
349+
else
350+
return coveringDeletion;
351+
}
352+
353+
Covering newCovering = combine(coveringDeletion, existingCovering);
354+
if (newCovering == coveringDeletion)
355+
return this;
356+
else
357+
return new Point(pointDeletion, newCovering);
358+
}
359+
else if (existing instanceof Point)
360+
{
361+
Point existingPoint = (Point) existing;
362+
Covering newCovering = combine(coveringDeletion, existingPoint.coveringDeletion);
363+
Covering newPoint = combine(pointDeletion, existingPoint.pointDeletion);
364+
if (newCovering == coveringDeletion && newPoint == pointDeletion)
365+
return this;
366+
if (newCovering == existingPoint.coveringDeletion && newPoint == existingPoint.pointDeletion)
367+
return existingPoint;
368+
369+
return new Point(newPoint, newCovering);
370+
}
371+
else
372+
throw new AssertionError("Boundaries cannot be positioned on row clusterings.");
373+
}
374+
375+
@Override
376+
public boolean hasPointData()
377+
{
378+
return true;
379+
}
380+
381+
@Override
382+
public TrieTombstoneMarker withUpdatedTimestamp(long l)
383+
{
384+
if (coveringDeletion != null)
385+
return new Covering(l, coveringDeletion.localDeletionTime()); // subsumed by range deletion
386+
return new Point(new Covering(l, pointDeletion.localDeletionTime()), null);
387+
}
388+
389+
@Override
390+
public boolean isBoundary()
391+
{
392+
return true;
393+
}
394+
395+
@Override
396+
public TrieTombstoneMarker precedingState(Direction direction)
397+
{
398+
return coveringDeletion;
399+
}
400+
401+
@Override
402+
public TrieTombstoneMarker restrict(boolean applicableBefore, boolean applicableAfter)
403+
{
404+
throw new AssertionError("Cannot have a row clustering as slice bound.");
405+
}
406+
407+
@Override
408+
public TrieTombstoneMarker asBoundary(Direction direction)
409+
{
410+
throw new AssertionError("Cannot have a row clustering as slice bound.");
411+
}
412+
413+
@Override
414+
public String toString()
415+
{
416+
return pointDeletion + (coveringDeletion != null ? "(under " + (coveringDeletion != null ? coveringDeletion : "LIVE") + ")" : "");
417+
}
418+
}
276419
}

src/java/org/apache/cassandra/db/tries/RangeState.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,10 @@ public interface RangeState<S extends RangeState<S>>
5757

5858
/// Assuming this is a covering state, promote it to a boundary active in the specified direction.
5959
S asBoundary(Direction direction);
60+
61+
/// Assuming this is a covering state, convert it to a point deletion.
62+
default S asPoint()
63+
{
64+
return null;
65+
}
6066
}

src/java/org/apache/cassandra/db/tries/TrieSetCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ S applyToCoveringState(S srcState, Direction direction)
145145
switch (this)
146146
{
147147
case POINT:
148-
return null;
148+
return srcState.asPoint();
149149
case COVERED:
150150
return srcState;
151151
case START:

0 commit comments

Comments
 (0)