Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions google-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@
<testing.version>0.43.1-alpha-SNAPSHOT</testing.version><!-- {x-version-update:google-cloud-testing:current} -->

<api-common.version>1.5.0</api-common.version>
<gax.version>1.23.0</gax.version>
<gax-grpc.version>1.23.0</gax-grpc.version>
<gax-httpjson.version>0.40.0</gax-httpjson.version>
<gax.version>1.24.0</gax.version>
<gax-grpc.version>1.24.0</gax-grpc.version>
<gax-httpjson.version>0.41.0</gax-httpjson.version>
<generated-proto-beta.version>0.8.0</generated-proto-beta.version>
<generated-proto-ga.version>1.7.0</generated-proto-ga.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Session createSession(final DatabaseId db) throws SpannerException {
new Callable<com.google.spanner.v1.Session>() {
@Override
public com.google.spanner.v1.Session call() throws Exception {
return rawGrpcRpc.createSession(
return gapicRpc.createSession(
db.getName(), getOptions().getSessionLabels(), options);
}
});
Expand Down Expand Up @@ -806,7 +806,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
new Callable<CommitResponse>() {
@Override
public CommitResponse call() throws Exception {
return rawGrpcRpc.commit(request, options);
return gapicRpc.commit(request, options);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}
});
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
Expand Down Expand Up @@ -872,7 +872,7 @@ public void close() {
new Callable<Void>() {
@Override
public Void call() throws Exception {
rawGrpcRpc.deleteSession(name, options);
gapicRpc.deleteSession(name, options);
return null;
}
});
Expand All @@ -898,7 +898,7 @@ ByteString beginTransaction() {
new Callable<Transaction>() {
@Override
public Transaction call() throws Exception {
return rawGrpcRpc.beginTransaction(request, options);
return gapicRpc.beginTransaction(request, options);
}
});
if (txn.getId().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
Expand Down Expand Up @@ -68,6 +69,8 @@
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
Expand Down Expand Up @@ -171,9 +174,10 @@ public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable Str
}
ListInstanceConfigsRequest request = requestBuilder.build();

// TODO: put projectName in metadata
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
ListInstanceConfigsResponse response =
get(instanceStub.listInstanceConfigsCallable().futureCall(request));
get(instanceStub.listInstanceConfigsCallable().futureCall(request, context));
return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken());
}

Expand All @@ -182,8 +186,9 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne
GetInstanceConfigRequest request =
GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build();

// TODO: put projectName in metadata
return get(instanceStub.getInstanceConfigCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
return get(instanceStub.getInstanceConfigCallable().futureCall(request, context));
}

@Override
Expand All @@ -199,9 +204,10 @@ public Paginated<Instance> listInstances(
}
ListInstancesRequest request = requestBuilder.build();

// TODO: put projectName in metadata
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
ListInstancesResponse response =
get(instanceStub.listInstancesCallable().futureCall(request));
get(instanceStub.listInstancesCallable().futureCall(request, context));
return new Paginated<>(response.getInstancesList(), response.getNextPageToken());
}

Expand All @@ -214,34 +220,40 @@ public Operation createInstance(String parent, String instanceId, Instance insta
.setInstanceId(instanceId)
.setInstance(instance)
.build();
// TODO: put parent in metadata
return get(instanceStub.createInstanceCallable().futureCall(request));

GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(parent, projectName));
return get(instanceStub.createInstanceCallable().futureCall(request, context));
}

@Override
public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException {
UpdateInstanceRequest request =
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
// TODO: put instance.getName() in metadata
return get(instanceStub.updateInstanceCallable().futureCall(request));

GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instance.getName(), projectName));
return get(instanceStub.updateInstanceCallable().futureCall(request, context));
}

@Override
public Instance getInstance(String instanceName) throws SpannerException {
GetInstanceRequest request =
GetInstanceRequest.newBuilder().setName(instanceName).build();

// TODO: put instanceName in metadata
return get(instanceStub.getInstanceCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
return get(instanceStub.getInstanceCallable().futureCall(request, context));
}

@Override
public void deleteInstance(String instanceName) throws SpannerException {
DeleteInstanceRequest request =
DeleteInstanceRequest.newBuilder().setName(instanceName).build();

// TODO: put instanceName in metadata
get(instanceStub.deleteInstanceCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
get(instanceStub.deleteInstanceCallable().futureCall(request, context));
}

@Override
Expand All @@ -254,8 +266,10 @@ public Paginated<Database> listDatabases(
}
ListDatabasesRequest request = requestBuilder.build();

// TODO: put instanceName in metadata
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable()
.futureCall(request, context));
return new Paginated<>(response.getDatabasesList(), response.getNextPageToken());
}

Expand All @@ -268,8 +282,9 @@ public Operation createDatabase(String instanceName, String createDatabaseStatem
.setCreateStatement(createDatabaseStatement)
.addAllExtraStatements(additionalStatements)
.build();
// TODO: put instanceName in metadata
return get(databaseStub.createDatabaseCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
return get(databaseStub.createDatabaseCallable().futureCall(request, context));
}

@Override
Expand All @@ -281,17 +296,19 @@ public Operation updateDatabaseDdl(String databaseName, Iterable<String> updateD
.addAllStatements(updateDatabaseStatements)
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
.build();
// TODO: put databaseName in metadata
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request, context));
}

@Override
public void dropDatabase(String databaseName) throws SpannerException {
DropDatabaseRequest request =
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();

// TODO: put databaseName in metadata
get(databaseStub.dropDatabaseCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
get(databaseStub.dropDatabaseCallable().futureCall(request, context));
}

@Override
Expand All @@ -301,43 +318,65 @@ public Database getDatabase(String databaseName) throws SpannerException {
.setName(databaseName)
.build();

// TODO: put databaseName in metadata
return get(databaseStub.getDatabaseCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(databaseStub.getDatabaseCallable().futureCall(request, context));
}

@Override
public List<String> getDatabaseDdl(String databaseName) throws SpannerException {
GetDatabaseDdlRequest request =
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();

// TODO: put databaseName in metadata
return get(databaseStub.getDatabaseDdlCallable().futureCall(request))
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context))
.getStatementsList();
}

@Override
public Operation getOperation(String name) throws SpannerException {
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
// TODO: put name in metadata
return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withExtraHeaders(metadataProvider.newExtraHeaders(name, projectName));
return get(databaseStub.getOperationsStub().getOperationCallable()
.futureCall(request, context));
}

@Override
public Session createSession(String databaseName, @Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
CreateSessionRequest.Builder requestBuilder =
CreateSessionRequest.newBuilder().setDatabase(databaseName);
if (labels != null && !labels.isEmpty()) {
Session.Builder session = Session.newBuilder().putAllLabels(labels);
requestBuilder.setSession(session);
}
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
return get(stub.createSessionCallable().futureCall(request, context));
}

@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
DeleteSessionRequest request =
DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(sessionName, projectName));
get(stub.deleteSessionCallable().futureCall(request, context));
}

@Override
public StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
throw new UnsupportedOperationException("not implemented yet");
}

@Override
Expand All @@ -349,33 +388,47 @@ public StreamingCall executeQuery(
@Override
public Transaction beginTransaction(
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));

This comment was marked as spam.

return get(stub.beginTransactionCallable().futureCall(request, context));
}

@Override
public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> options)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(
metadataProvider.newExtraHeaders(commitRequest.getSession(), projectName));
return get(stub.commitCallable().futureCall(commitRequest, context));
}

@Override
public void rollback(RollbackRequest request, @Nullable Map<Option, ?> options)
throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
get(stub.rollbackCallable().futureCall(request, context));
}

@Override
public PartitionResponse partitionQuery(
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
throw new UnsupportedOperationException("Not implemented yet.");
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
return get(stub.partitionQueryCallable().futureCall(request, context));
}

@Override
public PartitionResponse partitionRead(
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
// TODO(pongad): Figure out metadata
// TODO(pongad): Figure out channel affinity
return get(stub.partitionReadCallable().futureCall(request));
GrpcCallContext context = GrpcCallContext.createDefault()
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
return get(stub.partitionReadCallable().futureCall(request, context));
}

/** Gets the result of an async RPC call, handling any exceptions encountered. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -27,15 +29,15 @@
*/
class SpannerMetadataProvider {
private final Map<Metadata.Key<String>, String> headers;
private final Key<String> resourceHeaderKey;
private final String resourceHeaderKey;

private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
};

private SpannerMetadataProvider(Map<String, String> headers, String resourceHeaderKey) {
this.resourceHeaderKey = Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER);
this.resourceHeaderKey = resourceHeaderKey;
this.headers = constructHeadersAsMetadata(headers);
}

Expand All @@ -50,11 +52,21 @@ Metadata newMetadata(String resourceTokenTemplate, String defaultResourceToken)
}

metadata.put(
resourceHeaderKey, getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken));
Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER),
getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken));

return metadata;
}

Map<String, List<String>> newExtraHeaders(String resourceTokenTemplate, String defaultResourceToken) {
return ImmutableMap.<String, List<String>>builder()
.put(
resourceHeaderKey,
Arrays.asList(getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)))
.build();

}

private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
Map<String, String> headers) {
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<site.installationModule>google-cloud</site.installationModule>
<bom.version>0.43.1-alpha-SNAPSHOT</bom.version><!-- {x-version-update:google-cloud-pom:current} -->
<api-client.version>1.23.0</api-client.version>
<gax.version>1.23.0</gax.version>
<gax.version>1.24.0</gax.version>
<google.auth.version>0.9.0</google.auth.version>
<grpc.version>1.10.1</grpc.version>
<nettyssl.version>2.0.7.Final</nettyssl.version>
Expand Down