Skip to content

Commit ba699f0

Browse files
Add undo recovery support for aggregation tables
This PR introduces the undo recovery mechanism for Flink sink writer to handle failure recovery scenarios with aggregation merge engine tables. Key components: - ByteArrayWrapper: Utility class for using byte arrays as map keys - UndoComputer: Computes undo operations by comparing checkpoint state with current log records, supporting both full row and partial update modes - UndoRecoveryExecutor: Executes undo operations using UpsertWriter - UndoRecoveryCoordinator: Coordinates the recovery process across buckets, managing log scanning, undo computation, and execution - BucketRecoveryContext: Holds per-bucket recovery state The undo recovery works by: 1. Scanning log records from checkpoint offset to current end offset 2. Computing inverse operations for uncommitted records 3. Executing undo operations to restore table state For partial update mode, INSERT records require full row deletion (not partial column deletion), which is handled by using a separate delete writer with null target columns.
1 parent 27ac618 commit ba699f0

File tree

11 files changed

+2822
-0
lines changed

11 files changed

+2822
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.utils;
19+
20+
import java.util.Arrays;
21+
22+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
23+
24+
/**
25+
* A wrapper for byte[] that provides proper equals() and hashCode() implementations for use as Map
26+
* keys.
27+
*
28+
* <p>The hashCode is pre-computed at construction time for better performance when used in
29+
* hash-based collections.
30+
*/
31+
public final class ByteArrayWrapper {
32+
33+
private final byte[] data;
34+
private final int hashCode;
35+
36+
public ByteArrayWrapper(byte[] data) {
37+
this.data = checkNotNull(data, "data cannot be null");
38+
this.hashCode = Arrays.hashCode(data);
39+
}
40+
41+
public byte[] getData() {
42+
return data;
43+
}
44+
45+
@Override
46+
public boolean equals(Object o) {
47+
if (this == o) {
48+
return true;
49+
}
50+
if (!(o instanceof ByteArrayWrapper)) {
51+
return false;
52+
}
53+
return Arrays.equals(data, ((ByteArrayWrapper) o).data);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return hashCode;
59+
}
60+
61+
@Override
62+
public String toString() {
63+
return "ByteArrayWrapper{length=" + data.length + "}";
64+
}
65+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.utils;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
27+
28+
/** Tests for {@link ByteArrayWrapper}. */
29+
class ByteArrayWrapperTest {
30+
31+
@Test
32+
void testEqualsAndHashCode() {
33+
byte[] data1 = new byte[] {1, 2, 3};
34+
byte[] data2 = new byte[] {1, 2, 3};
35+
byte[] data3 = new byte[] {1, 2, 4};
36+
37+
ByteArrayWrapper wrapper1 = new ByteArrayWrapper(data1);
38+
ByteArrayWrapper wrapper2 = new ByteArrayWrapper(data2);
39+
ByteArrayWrapper wrapper3 = new ByteArrayWrapper(data3);
40+
41+
// Same content should be equal
42+
assertThat(wrapper1).isEqualTo(wrapper2);
43+
assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
44+
45+
// Different content should not be equal
46+
assertThat(wrapper1).isNotEqualTo(wrapper3);
47+
}
48+
49+
@Test
50+
void testNullDataThrowsException() {
51+
assertThatThrownBy(() -> new ByteArrayWrapper(null))
52+
.isInstanceOf(NullPointerException.class)
53+
.hasMessageContaining("data cannot be null");
54+
}
55+
56+
@Test
57+
void testAsMapKey() {
58+
byte[] key1 = new byte[] {1, 2, 3};
59+
byte[] key2 = new byte[] {1, 2, 3}; // Same content, different array
60+
byte[] key3 = new byte[] {4, 5, 6};
61+
62+
Map<ByteArrayWrapper, String> map = new HashMap<>();
63+
map.put(new ByteArrayWrapper(key1), "value1");
64+
65+
// Should find with same content
66+
assertThat(map.get(new ByteArrayWrapper(key2))).isEqualTo("value1");
67+
68+
// Should not find with different content
69+
assertThat(map.get(new ByteArrayWrapper(key3))).isNull();
70+
71+
// Should overwrite with same key
72+
map.put(new ByteArrayWrapper(key2), "value2");
73+
assertThat(map).hasSize(1);
74+
assertThat(map.get(new ByteArrayWrapper(key1))).isEqualTo("value2");
75+
}
76+
77+
@Test
78+
void testGetData() {
79+
byte[] data = new byte[] {1, 2, 3};
80+
ByteArrayWrapper wrapper = new ByteArrayWrapper(data);
81+
82+
assertThat(wrapper.getData()).isSameAs(data);
83+
}
84+
85+
@Test
86+
void testEmptyArray() {
87+
ByteArrayWrapper wrapper1 = new ByteArrayWrapper(new byte[0]);
88+
ByteArrayWrapper wrapper2 = new ByteArrayWrapper(new byte[0]);
89+
90+
assertThat(wrapper1).isEqualTo(wrapper2);
91+
assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
92+
}
93+
94+
@Test
95+
void testToString() {
96+
ByteArrayWrapper wrapper = new ByteArrayWrapper(new byte[] {1, 2, 3});
97+
assertThat(wrapper.toString()).contains("length=3");
98+
}
99+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.sink.writer.undo;
19+
20+
import org.apache.fluss.metadata.TableBucket;
21+
import org.apache.fluss.utils.ByteArrayWrapper;
22+
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
26+
/**
27+
* Encapsulates the recovery state for a single bucket.
28+
*
29+
* <p>This class tracks:
30+
*
31+
* <ul>
32+
* <li>The bucket being recovered
33+
* <li>The checkpoint offset (start point for reading changelog)
34+
* <li>The log end offset (end point for reading changelog)
35+
* <li>Processed primary keys for deduplication (streaming execution)
36+
* <li>Progress tracking during changelog scanning
37+
* </ul>
38+
*/
39+
public class BucketRecoveryContext {
40+
41+
private final TableBucket bucket;
42+
private final long checkpointOffset;
43+
private long logEndOffset;
44+
45+
private final Set<ByteArrayWrapper> processedKeys;
46+
private long lastProcessedOffset;
47+
private int totalRecordsProcessed;
48+
49+
public BucketRecoveryContext(TableBucket bucket, long checkpointOffset) {
50+
this.bucket = bucket;
51+
this.checkpointOffset = checkpointOffset;
52+
this.logEndOffset = -1;
53+
this.processedKeys = new HashSet<>();
54+
this.lastProcessedOffset = checkpointOffset;
55+
this.totalRecordsProcessed = 0;
56+
}
57+
58+
public TableBucket getBucket() {
59+
return bucket;
60+
}
61+
62+
public long getCheckpointOffset() {
63+
return checkpointOffset;
64+
}
65+
66+
public long getLogEndOffset() {
67+
return logEndOffset;
68+
}
69+
70+
public void setLogEndOffset(long logEndOffset) {
71+
this.logEndOffset = logEndOffset;
72+
}
73+
74+
public Set<ByteArrayWrapper> getProcessedKeys() {
75+
return processedKeys;
76+
}
77+
78+
/**
79+
* Checks if this bucket needs recovery.
80+
*
81+
* @return true if checkpoint offset is less than log end offset
82+
*/
83+
public boolean needsRecovery() {
84+
return checkpointOffset < logEndOffset;
85+
}
86+
87+
/**
88+
* Checks if changelog scanning is complete for this bucket.
89+
*
90+
* <p>Complete means either:
91+
*
92+
* <ul>
93+
* <li>No recovery is needed (checkpointOffset >= logEndOffset), or
94+
* <li>We have processed all expected records (totalRecordsProcessed >= logEndOffset -
95+
* checkpointOffset)
96+
* </ul>
97+
*
98+
* <p>This implementation assumes that changelog offsets are contiguous (no gaps). The number of
99+
* records to process equals logEndOffset - checkpointOffset.
100+
*
101+
* <p><b>Edge case:</b> If checkpointOffset == logEndOffset - 1 (only one record to process) and
102+
* that record is skipped (e.g., UPDATE_AFTER), totalRecordsProcessed will be incremented but
103+
* processedKeys may be empty. This is correct behavior - the record was processed (read and
104+
* evaluated), it just didn't require an undo operation.
105+
*
106+
* @return true if changelog scanning is complete
107+
*/
108+
public boolean isComplete() {
109+
// If no recovery is needed, we're already complete
110+
if (!needsRecovery()) {
111+
return true;
112+
}
113+
// Number of records to process = logEndOffset - checkpointOffset
114+
// (offsets are contiguous, no gaps)
115+
long expectedRecords = logEndOffset - checkpointOffset;
116+
return totalRecordsProcessed >= expectedRecords;
117+
}
118+
119+
/**
120+
* Records that a changelog record has been processed.
121+
*
122+
* @param offset the offset of the processed record
123+
*/
124+
public void recordProcessed(long offset) {
125+
lastProcessedOffset = offset;
126+
totalRecordsProcessed++;
127+
}
128+
129+
public int getTotalRecordsProcessed() {
130+
return totalRecordsProcessed;
131+
}
132+
133+
public long getLastProcessedOffset() {
134+
return lastProcessedOffset;
135+
}
136+
137+
@Override
138+
public String toString() {
139+
return "BucketRecoveryContext{"
140+
+ "bucket="
141+
+ bucket
142+
+ ", checkpointOffset="
143+
+ checkpointOffset
144+
+ ", logEndOffset="
145+
+ logEndOffset
146+
+ ", processedKeys="
147+
+ processedKeys.size()
148+
+ ", complete="
149+
+ isComplete()
150+
+ '}';
151+
}
152+
}

0 commit comments

Comments
 (0)