Skip to content

Commit b13cb72

Browse files
fix: make location cache expire after 10 minutes
1 parent 0749f3f commit b13cb72

File tree

3 files changed

+89
-2
lines changed

3 files changed

+89
-2
lines changed

google-cloud-bigquerystorage/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>arrow-memory-core</artifactId>
2929
<version>17.0.0</version>
3030
</dependency>
31+
<dependency>
32+
<groupId>com.github.ben-manes.caffeine</groupId>
33+
<artifactId>caffeine</artifactId>
34+
<version>3.2.2</version>
35+
</dependency>
3136
</dependencies>
3237
</dependencyManagement>
3338
<build>
@@ -175,6 +180,10 @@
175180
<groupId>org.apache.arrow</groupId>
176181
<artifactId>arrow-vector</artifactId>
177182
</dependency>
183+
<dependency>
184+
<groupId>com.github.ben-manes.caffeine</groupId>
185+
<artifactId>caffeine</artifactId>
186+
</dependency>
178187

179188
<!-- Test dependencies -->
180189
<dependency>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.google.cloud.bigquery.storage.v1;
1717

18+
import com.github.benmanes.caffeine.cache.Cache;
19+
import com.github.benmanes.caffeine.cache.Caffeine;
1820
import com.google.api.core.ApiFuture;
1921
import com.google.api.gax.batching.FlowController;
2022
import com.google.api.gax.core.CredentialsProvider;
@@ -75,8 +77,15 @@ public class StreamWriter implements AutoCloseable {
7577
private static Pattern streamPatternDefaultStream = Pattern.compile(defaultStreamMatching);
7678

7779
// Cache of location info for a given dataset.
78-
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
80+
private static long LOCATION_CACHE_EXPIRE_MILLIS = 10 * 60 * 1000; // 10 minutes
7981

82+
private static Cache<String, String> allocateProjectLocationCache() {
83+
return Caffeine.newBuilder()
84+
.expireAfterWrite(LOCATION_CACHE_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
85+
.build();
86+
}
87+
88+
private static Cache<String, String> projectAndDatasetToLocation = allocateProjectLocationCache();
8089
/*
8190
* The identifier of stream to write to.
8291
*/
@@ -288,7 +297,7 @@ private StreamWriter(Builder builder) throws IOException {
288297
// Location is not passed in, try to fetch from RPC
289298
String datasetAndProjectName = extractDatasetAndProjectName(builder.streamName);
290299
location =
291-
projectAndDatasetToLocation.computeIfAbsent(
300+
projectAndDatasetToLocation.get(
292301
datasetAndProjectName,
293302
(key) -> {
294303
GetWriteStreamRequest writeStreamRequest =
@@ -371,6 +380,12 @@ static boolean isDefaultStream(String streamName) {
371380
return streamMatcher.find();
372381
}
373382

383+
@VisibleForTesting
384+
static void recreateProjectLocationCache(long durationExpireMillis) {
385+
LOCATION_CACHE_EXPIRE_MILLIS = durationExpireMillis;
386+
projectAndDatasetToLocation = allocateProjectLocationCache();
387+
}
388+
374389
String getFullTraceId() {
375390
return fullTraceId;
376391
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.google.protobuf.Descriptors;
5757
import com.google.protobuf.Descriptors.DescriptorValidationException;
5858
import com.google.protobuf.Int64Value;
59+
import com.google.protobuf.Timestamp;
5960
import io.grpc.Status;
6061
import io.grpc.StatusRuntimeException;
6162
import io.opentelemetry.api.common.Attributes;
@@ -2551,4 +2552,66 @@ public void testGetDefaultStreamName() {
25512552
assertEquals(
25522553
"projects/projectId/datasets/datasetId/tables/tableId/_default", actualDefaultName);
25532554
}
2555+
2556+
@Test
2557+
public void testLocationCacheIsHit() throws Exception {
2558+
WriteStream expectedResponse =
2559+
WriteStream.newBuilder()
2560+
.setName(WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]").toString())
2561+
.setCreateTime(Timestamp.newBuilder().build())
2562+
.setCommitTime(Timestamp.newBuilder().build())
2563+
.setTableSchema(TableSchema.newBuilder().build())
2564+
.setLocation("oklahoma")
2565+
.build();
2566+
testBigQueryWrite.addResponse(expectedResponse);
2567+
2568+
// first stream will result in call to getWriteStream for location lookup
2569+
StreamWriter writer1 =
2570+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2571+
.setWriterSchema(createProtoSchema())
2572+
.setEnableConnectionPool(true)
2573+
.build();
2574+
2575+
// second stream will hit the location cache
2576+
StreamWriter writer2 =
2577+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2578+
.setWriterSchema(createProtoSchema())
2579+
.setEnableConnectionPool(true)
2580+
.build();
2581+
assertEquals(1, testBigQueryWrite.getWriteStreamRequests().size());
2582+
}
2583+
2584+
@Test
2585+
public void testLocationCacheExpires() throws Exception {
2586+
// force cache to expire in 1000 millis
2587+
StreamWriter.recreateProjectLocationCache(1000);
2588+
WriteStream expectedResponse =
2589+
WriteStream.newBuilder()
2590+
.setName(WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]").toString())
2591+
.setCreateTime(Timestamp.newBuilder().build())
2592+
.setCommitTime(Timestamp.newBuilder().build())
2593+
.setTableSchema(TableSchema.newBuilder().build())
2594+
.setLocation("oklahoma")
2595+
.build();
2596+
testBigQueryWrite.addResponse(expectedResponse);
2597+
testBigQueryWrite.addResponse(expectedResponse);
2598+
2599+
// first stream will result in call to getWriteStream for location lookup
2600+
StreamWriter writer1 =
2601+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2602+
.setWriterSchema(createProtoSchema())
2603+
.setEnableConnectionPool(true)
2604+
.build();
2605+
2606+
// force cache to expire
2607+
TimeUnit.SECONDS.sleep(2);
2608+
2609+
// second stream will result in call to getWriteStream for location lookup
2610+
StreamWriter writer2 =
2611+
StreamWriter.newBuilder(TEST_STREAM_1, client)
2612+
.setWriterSchema(createProtoSchema())
2613+
.setEnableConnectionPool(true)
2614+
.build();
2615+
assertEquals(2, testBigQueryWrite.getWriteStreamRequests().size());
2616+
}
25542617
}

0 commit comments

Comments
 (0)