Skip to content

Commit 42d0a6e

Browse files
Fix locking race between duplicate PartitionRecords w matching uid (#27055)
1 parent 1c12f92 commit 42d0a6e

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,15 @@ public boolean lockAndRecordPartition(PartitionRecord partitionRecord) {
702702
tableId, convertPartitionToStreamPartitionRowKey(partitionRecord.getPartition()))
703703
.condition(matchAnyString)
704704
.otherwise(mutation);
705-
return !dataClient.checkAndMutateRow(rowMutation);
705+
706+
boolean lockAcquired = !dataClient.checkAndMutateRow(rowMutation);
707+
if (lockAcquired) {
708+
return true;
709+
} else {
710+
// If the lock is already held we need to check if it was acquired by a duplicate
711+
// work item with the same uuid since we last checked doHoldLock above.
712+
return doHoldLock(partitionRecord.getPartition(), partitionRecord.getUuid());
713+
}
706714
}
707715

708716
/**

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.junit.Assert.assertNull;
2626
import static org.junit.Assert.assertTrue;
2727
import static org.junit.Assert.fail;
28+
import static org.mockito.Mockito.when;
2829

2930
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
3031
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
@@ -57,6 +58,8 @@
5758
import org.junit.Test;
5859
import org.junit.runner.RunWith;
5960
import org.junit.runners.JUnit4;
61+
import org.mockito.Mockito;
62+
import org.mockito.stubbing.Answer;
6063
import org.slf4j.Logger;
6164
import org.slf4j.LoggerFactory;
6265

@@ -141,7 +144,7 @@ public void testNewPartitionConversionWithWithIllegalUtf8()
141144
}
142145

143146
@Test
144-
public void testLockPartitionRace() throws InterruptedException {
147+
public void testLockPartitionRaceUniqueIds() throws InterruptedException {
145148
ByteStringRange partition = ByteStringRange.create("", "");
146149
ByteString rowKey = metadataTableDao.convertPartitionToStreamPartitionRowKey(partition);
147150
// Class to try to lock the partition in a separate thread.
@@ -223,6 +226,54 @@ public void run() {
223226
dataClient.mutateRow(rowMutation);
224227
}
225228

229+
@Test
230+
public void testLockPartitionRaceDuplicateIds() throws InterruptedException {
231+
ByteStringRange partition = ByteStringRange.create("", "");
232+
String uid = "a";
233+
MetadataTableDao spy = Mockito.spy(metadataTableDao);
234+
// First call we sleep for ten seconds to ensure the duplicate acquires the
235+
// lock before we return.
236+
when(spy.doHoldLock(partition, uid))
237+
.then(
238+
(Answer<Boolean>)
239+
invocation -> {
240+
Thread.sleep(10000);
241+
return false;
242+
})
243+
.thenCallRealMethod()
244+
.thenCallRealMethod();
245+
246+
class LockPartition implements Runnable {
247+
final PartitionRecord partitionRecord =
248+
new PartitionRecord(
249+
partition,
250+
Collections.emptyList(),
251+
uid,
252+
Instant.now(),
253+
Collections.emptyList(),
254+
Instant.now().plus(Duration.standardMinutes(10)));
255+
boolean locked = false;
256+
257+
@Override
258+
public void run() {
259+
locked = spy.lockAndRecordPartition(partitionRecord);
260+
}
261+
}
262+
263+
LockPartition dup1 = new LockPartition();
264+
Thread dup1Thread = new Thread(dup1);
265+
LockPartition dup2 = new LockPartition();
266+
Thread dup2Thread = new Thread(dup2);
267+
268+
dup1Thread.start();
269+
dup2Thread.start();
270+
dup1Thread.join();
271+
dup2Thread.join();
272+
273+
assertTrue(dup2.locked);
274+
assertTrue(dup1.locked);
275+
}
276+
226277
@Test
227278
public void testReadStreamPartitionsWithWatermark() throws InvalidProtocolBufferException {
228279
ByteStringRange lockedPartition = ByteStringRange.create("", "a");

0 commit comments

Comments
 (0)