Skip to content

Commit 5feb9ea

Browse files
authored
HDDS-11453. OmSnapshotPurge should be in a different ozone manager double buffer batch (apache#7188)
1 parent 703c4d5 commit 5feb9ea

2 files changed

Lines changed: 72 additions & 23 deletions

File tree

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
4747
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
4848
import org.apache.hadoop.ozone.om.response.OMClientResponse;
49+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
4950
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
5051
import org.apache.hadoop.util.Daemon;
5152
import org.apache.hadoop.util.Time;
@@ -426,28 +427,30 @@ private String addToBatch(Queue<Entry> buffer, BatchOperation batchOperation) {
426427
* in RocksDB callback flush. If multiple operations are flushed in one
427428
* specific batch, we are not sure at the flush of which specific operation
428429
* the callback is coming.
429-
* There could be a possibility of race condition that is exposed to rocksDB
430-
* behaviour for the batch.
430+
* PurgeSnapshot is also considered a barrier, since purgeSnapshot transaction on a standalone basis is an
431+
* idempotent operation. Once the directory gets deleted the previous transactions that have been performed on the
432+
* snapshotted rocksdb would start failing on replay since those transactions have not been committed but the
433+
* directory could have been partially deleted/ fully deleted. This could also lead to inconsistencies in the DB
434+
* reads from the purged rocksdb if operations are not performed consciously.
435+
* There could be a possibility of race condition that is exposed to rocksDB behaviour for the batch.
431436
* Hence, we treat createSnapshot as separate batch flush.
432437
* <p>
433438
* e.g. requestBuffer = [request1, request2, snapshotRequest1,
434439
* request3, snapshotRequest2, request4]
435440
* response = [[request1, request2], [snapshotRequest1], [request3],
436441
* [snapshotRequest2], [request4]]
437442
*/
438-
private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
443+
private synchronized List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
439444
final List<Queue<Entry>> response = new ArrayList<>();
440-
441445
OMResponse previousOmResponse = null;
442446
for (final Entry entry : readyBuffer) {
443447
OMResponse omResponse = entry.getResponse().getOMResponse();
444448
// New queue gets created in three conditions:
445449
// 1. It is first element in the response,
446-
// 2. Current request is createSnapshot request.
447-
// 3. Previous request was createSnapshot request.
448-
if (response.isEmpty() || omResponse.hasCreateSnapshotResponse()
449-
|| (previousOmResponse != null &&
450-
previousOmResponse.hasCreateSnapshotResponse())) {
450+
// 2. Current request is createSnapshot/purgeSnapshot request.
451+
// 3. Previous request was createSnapshot/purgeSnapshot request.
452+
if (response.isEmpty() || isStandaloneBatchCmdTypes(omResponse)
453+
|| isStandaloneBatchCmdTypes(previousOmResponse)) {
451454
response.add(new LinkedList<>());
452455
}
453456

@@ -458,6 +461,15 @@ private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
458461
return response;
459462
}
460463

464+
private static boolean isStandaloneBatchCmdTypes(OMResponse response) {
465+
if (response == null) {
466+
return false;
467+
}
468+
final OzoneManagerProtocolProtos.Type type = response.getCmdType();
469+
return type == OzoneManagerProtocolProtos.Type.SnapshotPurge
470+
|| type == OzoneManagerProtocolProtos.Type.CreateSnapshot;
471+
}
472+
461473
private void addCleanupEntry(Entry entry, Map<String, List<Long>> cleanupEpochs) {
462474
Class<? extends OMClientResponse> responseClass =
463475
entry.getResponse().getClass();
@@ -612,7 +624,7 @@ int getCurrentBufferSize() {
612624
return currentBuffer.size();
613625
}
614626

615-
int getReadyBufferSize() {
627+
synchronized int getReadyBufferSize() {
616628
return readyBuffer.size();
617629
}
618630

hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444
import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse;
4545
import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
4646
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
47+
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
4748
import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
4849
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
49-
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
5050
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
5151
import org.apache.hadoop.security.UserGroupInformation;
5252
import org.apache.hadoop.security.authentication.util.KerberosName;
@@ -81,12 +81,12 @@ class TestOzoneManagerDoubleBuffer {
8181
private OzoneManagerDoubleBuffer doubleBuffer;
8282
private OzoneManager ozoneManager;
8383
private S3SecretLockedManager secretManager;
84-
private final CreateSnapshotResponse snapshotResponse1 = mock(CreateSnapshotResponse.class);
85-
private final CreateSnapshotResponse snapshotResponse2 = mock(CreateSnapshotResponse.class);
8684
private final OMResponse omKeyResponse = mock(OMResponse.class);
8785
private final OMResponse omBucketResponse = mock(OMResponse.class);
8886
private final OMResponse omSnapshotResponse1 = mock(OMResponse.class);
8987
private final OMResponse omSnapshotResponse2 = mock(OMResponse.class);
88+
private final OMResponse omSnapshotPurgeResponseProto1 = mock(OMResponse.class);
89+
private final OMResponse omSnapshotPurgeResponseProto2 = mock(OMResponse.class);
9090
private static OMClientResponse omKeyCreateResponse =
9191
mock(OMKeyCreateResponse.class);
9292
private static OMClientResponse omBucketCreateResponse =
@@ -95,6 +95,9 @@ class TestOzoneManagerDoubleBuffer {
9595
mock(OMSnapshotCreateResponse.class);
9696
private static OMClientResponse omSnapshotCreateResponse2 =
9797
mock(OMSnapshotCreateResponse.class);
98+
private static OMClientResponse omSnapshotPurgeResponse1 = mock(OMSnapshotPurgeResponse.class);
99+
private static OMClientResponse omSnapshotPurgeResponse2 = mock(OMSnapshotPurgeResponse.class);
100+
98101
@TempDir
99102
private File tempDir;
100103
private OzoneManagerDoubleBuffer.FlushNotifier flushNotifier;
@@ -143,26 +146,33 @@ public void setup() throws IOException {
143146
doNothing().when(omBucketCreateResponse).checkAndUpdateDB(any(), any());
144147
doNothing().when(omSnapshotCreateResponse1).checkAndUpdateDB(any(), any());
145148
doNothing().when(omSnapshotCreateResponse2).checkAndUpdateDB(any(), any());
149+
doNothing().when(omSnapshotPurgeResponse1).checkAndUpdateDB(any(), any());
150+
doNothing().when(omSnapshotPurgeResponse2).checkAndUpdateDB(any(), any());
146151

147152
when(omKeyResponse.getTraceID()).thenReturn("keyTraceId");
148153
when(omBucketResponse.getTraceID()).thenReturn("bucketTraceId");
149154
when(omSnapshotResponse1.getTraceID()).thenReturn("snapshotTraceId-1");
150155
when(omSnapshotResponse2.getTraceID()).thenReturn("snapshotTraceId-2");
151-
when(omSnapshotResponse1.hasCreateSnapshotResponse())
152-
.thenReturn(true);
153-
when(omSnapshotResponse2.hasCreateSnapshotResponse())
154-
.thenReturn(true);
155-
when(omSnapshotResponse1.getCreateSnapshotResponse())
156-
.thenReturn(snapshotResponse1);
157-
when(omSnapshotResponse2.getCreateSnapshotResponse())
158-
.thenReturn(snapshotResponse2);
156+
when(omSnapshotPurgeResponseProto1.getTraceID()).thenReturn("snapshotPurgeTraceId-1");
157+
when(omSnapshotPurgeResponseProto2.getTraceID()).thenReturn("snapshotPurgeTraceId-2");
158+
159+
when(omKeyResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateKey);
160+
when(omBucketResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateBucket);
161+
when(omSnapshotPurgeResponseProto1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
162+
when(omSnapshotPurgeResponseProto2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
163+
when(omSnapshotResponse1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
164+
when(omSnapshotResponse2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
159165

160166
when(omKeyCreateResponse.getOMResponse()).thenReturn(omKeyResponse);
161167
when(omBucketCreateResponse.getOMResponse()).thenReturn(omBucketResponse);
162168
when(omSnapshotCreateResponse1.getOMResponse())
163169
.thenReturn(omSnapshotResponse1);
164170
when(omSnapshotCreateResponse2.getOMResponse())
165171
.thenReturn(omSnapshotResponse2);
172+
when(omSnapshotPurgeResponse1.getOMResponse())
173+
.thenReturn(omSnapshotPurgeResponseProto1);
174+
when(omSnapshotPurgeResponse2.getOMResponse())
175+
.thenReturn(omSnapshotPurgeResponseProto2);
166176
}
167177

168178
@AfterEach
@@ -194,8 +204,35 @@ private static Stream<Arguments> doubleBufferFlushCases() {
194204
omSnapshotCreateResponse1,
195205
omSnapshotCreateResponse2,
196206
omBucketCreateResponse),
197-
4L, 4L, 14L, 16L, 1L, 1.142F)
198-
);
207+
4L, 4L, 14L, 16L, 1L, 1.142F),
208+
Arguments.of(Arrays.asList(omSnapshotPurgeResponse1,
209+
omSnapshotPurgeResponse2),
210+
2L, 2L, 16L, 18L, 1L, 1.125F),
211+
Arguments.of(Arrays.asList(omKeyCreateResponse,
212+
omBucketCreateResponse,
213+
omSnapshotPurgeResponse1,
214+
omSnapshotPurgeResponse2),
215+
3L, 4L, 19L, 22L, 2L, 1.157F),
216+
Arguments.of(Arrays.asList(omKeyCreateResponse,
217+
omSnapshotPurgeResponse1,
218+
omBucketCreateResponse,
219+
omSnapshotPurgeResponse2),
220+
4L, 4L, 23L, 26L, 1L, 1.1300F),
221+
Arguments.of(Arrays.asList(omKeyCreateResponse,
222+
omSnapshotPurgeResponse1,
223+
omSnapshotPurgeResponse2,
224+
omBucketCreateResponse),
225+
4L, 4L, 27L, 30L, 1L, 1.111F),
226+
Arguments.of(Arrays.asList(omKeyCreateResponse,
227+
omBucketCreateResponse,
228+
omSnapshotPurgeResponse1,
229+
omSnapshotCreateResponse1,
230+
omSnapshotPurgeResponse2,
231+
omBucketCreateResponse,
232+
omSnapshotCreateResponse2),
233+
6L, 7L, 33L, 37L, 2L, 1.121F)
234+
235+
);
199236
}
200237

201238
/**

0 commit comments

Comments
 (0)