Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.ByteString;
import io.grpc.Status;
Expand All @@ -48,7 +50,9 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
Expand All @@ -75,8 +79,15 @@ public class StreamWriter implements AutoCloseable {
private static Pattern streamPatternDefaultStream = Pattern.compile(defaultStreamMatching);

// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
private static long LOCATION_CACHE_EXPIRE_MILLIS = 10 * 60 * 1000; // 10 minutes

private static Cache<String, String> allocateProjectLocationCache() {
return CacheBuilder.newBuilder()
.expireAfterWrite(LOCATION_CACHE_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
.build();
}

private static Cache<String, String> projectAndDatasetToLocation = allocateProjectLocationCache();
/*
* The identifier of stream to write to.
*/
Expand Down Expand Up @@ -287,26 +298,33 @@ private StreamWriter(Builder builder) throws IOException {
if (location == null || location.isEmpty()) {
// Location is not passed in, try to fetch from RPC
String datasetAndProjectName = extractDatasetAndProjectName(builder.streamName);
location =
projectAndDatasetToLocation.computeIfAbsent(
datasetAndProjectName,
(key) -> {
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(this.getStreamName())
.setView(WriteStreamView.BASIC)
.build();

WriteStream writeStream = client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
String fetchedLocation = writeStream.getLocation();
log.info(
String.format(
"Fetched location %s for stream name %s, extracted project and dataset"
+ " name: %s\"",
fetchedLocation, streamName, datasetAndProjectName));
return fetchedLocation;
});
try {
location =
projectAndDatasetToLocation.get(
datasetAndProjectName,
new Callable<String>() {
@Override
public String call() throws Exception {
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(getStreamName())
.setView(WriteStreamView.BASIC)
.build();

WriteStream writeStream = client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
String fetchedLocation = writeStream.getLocation();
log.info(
String.format(
"Fetched location %s for stream name %s, extracted project and dataset"
+ " name: %s\"",
fetchedLocation, streamName, datasetAndProjectName));
return fetchedLocation;
}
});
} catch (ExecutionException e) {
throw new IllegalStateException(e.getCause());
}
if (location.isEmpty()) {
throw new IllegalStateException(
String.format(
Expand Down Expand Up @@ -371,6 +389,12 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

@VisibleForTesting
static void recreateProjectLocationCache(long durationExpireMillis) {
LOCATION_CACHE_EXPIRE_MILLIS = durationExpireMillis;
projectAndDatasetToLocation = allocateProjectLocationCache();
}

String getFullTraceId() {
return fullTraceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -2551,4 +2552,66 @@ public void testGetDefaultStreamName() {
assertEquals(
"projects/projectId/datasets/datasetId/tables/tableId/_default", actualDefaultName);
}

@Test
public void testLocationCacheIsHit() throws Exception {
WriteStream expectedResponse =
WriteStream.newBuilder()
.setName(WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]").toString())
.setCreateTime(Timestamp.newBuilder().build())
.setCommitTime(Timestamp.newBuilder().build())
.setTableSchema(TableSchema.newBuilder().build())
.setLocation("oklahoma")
.build();
testBigQueryWrite.addResponse(expectedResponse);

// first stream will result in call to getWriteStream for location lookup
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.build();

// second stream will hit the location cache
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.build();
assertEquals(1, testBigQueryWrite.getWriteStreamRequests().size());
}

@Test
public void testLocationCacheExpires() throws Exception {
// force cache to expire in 1000 millis
StreamWriter.recreateProjectLocationCache(1000);
WriteStream expectedResponse =
WriteStream.newBuilder()
.setName(WriteStreamName.of("[PROJECT]", "[DATASET]", "[TABLE]", "[STREAM]").toString())
.setCreateTime(Timestamp.newBuilder().build())
.setCommitTime(Timestamp.newBuilder().build())
.setTableSchema(TableSchema.newBuilder().build())
.setLocation("oklahoma")
.build();
testBigQueryWrite.addResponse(expectedResponse);
testBigQueryWrite.addResponse(expectedResponse);

// first stream will result in call to getWriteStream for location lookup
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.build();

// force cache to expire
TimeUnit.SECONDS.sleep(2);

// second stream will result in call to getWriteStream for location lookup
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.build();
assertEquals(2, testBigQueryWrite.getWriteStreamRequests().size());
}
}
Loading