Skip to content

Commit f539045

Browse files
committed
WIP: inject headers to grpcCallContext before making calls
1 parent 75d1d0b commit f539045

File tree

5 files changed

+114
-49
lines changed

5 files changed

+114
-49
lines changed

google-cloud-bom/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@
170170
<testing.version>0.43.1-alpha-SNAPSHOT</testing.version><!-- {x-version-update:google-cloud-testing:current} -->
171171

172172
<api-common.version>1.5.0</api-common.version>
173-
<gax.version>1.23.0</gax.version>
174-
<gax-grpc.version>1.23.0</gax-grpc.version>
175-
<gax-httpjson.version>0.40.0</gax-httpjson.version>
173+
<gax.version>1.24.0</gax.version>
174+
<gax-grpc.version>1.24.0</gax-grpc.version>
175+
<gax-httpjson.version>0.41.0</gax-httpjson.version>
176176
<generated-proto-beta.version>0.8.0</generated-proto-beta.version>
177177
<generated-proto-ga.version>1.7.0</generated-proto-ga.version>
178178
</properties>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ Session createSession(final DatabaseId db) throws SpannerException {
266266
new Callable<com.google.spanner.v1.Session>() {
267267
@Override
268268
public com.google.spanner.v1.Session call() throws Exception {
269-
return rawGrpcRpc.createSession(
269+
return gapicRpc.createSession(
270270
db.getName(), getOptions().getSessionLabels(), options);
271271
}
272272
});
@@ -806,7 +806,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
806806
new Callable<CommitResponse>() {
807807
@Override
808808
public CommitResponse call() throws Exception {
809-
return rawGrpcRpc.commit(request, options);
809+
return gapicRpc.commit(request, options);
810810
}
811811
});
812812
Timestamp t = Timestamp.fromProto(response.getCommitTimestamp());
@@ -872,7 +872,7 @@ public void close() {
872872
new Callable<Void>() {
873873
@Override
874874
public Void call() throws Exception {
875-
rawGrpcRpc.deleteSession(name, options);
875+
gapicRpc.deleteSession(name, options);
876876
return null;
877877
}
878878
});
@@ -898,7 +898,7 @@ ByteString beginTransaction() {
898898
new Callable<Transaction>() {
899899
@Override
900900
public Transaction call() throws Exception {
901-
return rawGrpcRpc.beginTransaction(request, options);
901+
return gapicRpc.beginTransaction(request, options);
902902
}
903903
});
904904
if (txn.getId().isEmpty()) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 91 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.gax.core.CredentialsProvider;
2222
import com.google.api.gax.core.GaxProperties;
2323
import com.google.api.gax.grpc.GaxGrpcProperties;
24+
import com.google.api.gax.grpc.GrpcCallContext;
2425
import com.google.api.gax.grpc.GrpcTransportChannel;
2526
import com.google.api.gax.rpc.ApiClientHeaderProvider;
2627
import com.google.api.gax.rpc.FixedTransportChannelProvider;
@@ -68,6 +69,8 @@
6869
import com.google.spanner.v1.BeginTransactionRequest;
6970
import com.google.spanner.v1.CommitRequest;
7071
import com.google.spanner.v1.CommitResponse;
72+
import com.google.spanner.v1.CreateSessionRequest;
73+
import com.google.spanner.v1.DeleteSessionRequest;
7174
import com.google.spanner.v1.ExecuteSqlRequest;
7275
import com.google.spanner.v1.PartitionQueryRequest;
7376
import com.google.spanner.v1.PartitionReadRequest;
@@ -171,9 +174,10 @@ public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable Str
171174
}
172175
ListInstanceConfigsRequest request = requestBuilder.build();
173176

174-
// TODO: put projectName in metadata
177+
GrpcCallContext context = GrpcCallContext.createDefault()
178+
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
175179
ListInstanceConfigsResponse response =
176-
get(instanceStub.listInstanceConfigsCallable().futureCall(request));
180+
get(instanceStub.listInstanceConfigsCallable().futureCall(request, context));
177181
return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken());
178182
}
179183

@@ -182,8 +186,9 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne
182186
GetInstanceConfigRequest request =
183187
GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build();
184188

185-
// TODO: put projectName in metadata
186-
return get(instanceStub.getInstanceConfigCallable().futureCall(request));
189+
GrpcCallContext context = GrpcCallContext.createDefault()
190+
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
191+
return get(instanceStub.getInstanceConfigCallable().futureCall(request, context));
187192
}
188193

189194
@Override
@@ -199,9 +204,10 @@ public Paginated<Instance> listInstances(
199204
}
200205
ListInstancesRequest request = requestBuilder.build();
201206

202-
// TODO: put projectName in metadata
207+
GrpcCallContext context = GrpcCallContext.createDefault()
208+
.withExtraHeaders(metadataProvider.newExtraHeaders(projectName, projectName));
203209
ListInstancesResponse response =
204-
get(instanceStub.listInstancesCallable().futureCall(request));
210+
get(instanceStub.listInstancesCallable().futureCall(request, context));
205211
return new Paginated<>(response.getInstancesList(), response.getNextPageToken());
206212
}
207213

@@ -214,34 +220,40 @@ public Operation createInstance(String parent, String instanceId, Instance insta
214220
.setInstanceId(instanceId)
215221
.setInstance(instance)
216222
.build();
217-
// TODO: put parent in metadata
218-
return get(instanceStub.createInstanceCallable().futureCall(request));
223+
224+
GrpcCallContext context = GrpcCallContext.createDefault()
225+
.withExtraHeaders(metadataProvider.newExtraHeaders(parent, projectName));
226+
return get(instanceStub.createInstanceCallable().futureCall(request, context));
219227
}
220228

221229
@Override
222230
public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException {
223231
UpdateInstanceRequest request =
224232
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
225-
// TODO: put instance.getName() in metadata
226-
return get(instanceStub.updateInstanceCallable().futureCall(request));
233+
234+
GrpcCallContext context = GrpcCallContext.createDefault()
235+
.withExtraHeaders(metadataProvider.newExtraHeaders(instance.getName(), projectName));
236+
return get(instanceStub.updateInstanceCallable().futureCall(request, context));
227237
}
228238

229239
@Override
230240
public Instance getInstance(String instanceName) throws SpannerException {
231241
GetInstanceRequest request =
232242
GetInstanceRequest.newBuilder().setName(instanceName).build();
233243

234-
// TODO: put instanceName in metadata
235-
return get(instanceStub.getInstanceCallable().futureCall(request));
244+
GrpcCallContext context = GrpcCallContext.createDefault()
245+
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
246+
return get(instanceStub.getInstanceCallable().futureCall(request, context));
236247
}
237248

238249
@Override
239250
public void deleteInstance(String instanceName) throws SpannerException {
240251
DeleteInstanceRequest request =
241252
DeleteInstanceRequest.newBuilder().setName(instanceName).build();
242253

243-
// TODO: put instanceName in metadata
244-
get(instanceStub.deleteInstanceCallable().futureCall(request));
254+
GrpcCallContext context = GrpcCallContext.createDefault()
255+
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
256+
get(instanceStub.deleteInstanceCallable().futureCall(request, context));
245257
}
246258

247259
@Override
@@ -254,8 +266,10 @@ public Paginated<Database> listDatabases(
254266
}
255267
ListDatabasesRequest request = requestBuilder.build();
256268

257-
// TODO: put instanceName in metadata
258-
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable().futureCall(request));
269+
GrpcCallContext context = GrpcCallContext.createDefault()
270+
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
271+
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable()
272+
.futureCall(request, context));
259273
return new Paginated<>(response.getDatabasesList(), response.getNextPageToken());
260274
}
261275

@@ -268,8 +282,9 @@ public Operation createDatabase(String instanceName, String createDatabaseStatem
268282
.setCreateStatement(createDatabaseStatement)
269283
.addAllExtraStatements(additionalStatements)
270284
.build();
271-
// TODO: put instanceName in metadata
272-
return get(databaseStub.createDatabaseCallable().futureCall(request));
285+
GrpcCallContext context = GrpcCallContext.createDefault()
286+
.withExtraHeaders(metadataProvider.newExtraHeaders(instanceName, projectName));
287+
return get(databaseStub.createDatabaseCallable().futureCall(request, context));
273288
}
274289

275290
@Override
@@ -281,17 +296,19 @@ public Operation updateDatabaseDdl(String databaseName, Iterable<String> updateD
281296
.addAllStatements(updateDatabaseStatements)
282297
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
283298
.build();
284-
// TODO: put databaseName in metadata
285-
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request));
299+
GrpcCallContext context = GrpcCallContext.createDefault()
300+
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
301+
return get(databaseStub.updateDatabaseDdlCallable().futureCall(request, context));
286302
}
287303

288304
@Override
289305
public void dropDatabase(String databaseName) throws SpannerException {
290306
DropDatabaseRequest request =
291307
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();
292308

293-
// TODO: put databaseName in metadata
294-
get(databaseStub.dropDatabaseCallable().futureCall(request));
309+
GrpcCallContext context = GrpcCallContext.createDefault()
310+
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
311+
get(databaseStub.dropDatabaseCallable().futureCall(request, context));
295312
}
296313

297314
@Override
@@ -301,43 +318,65 @@ public Database getDatabase(String databaseName) throws SpannerException {
301318
.setName(databaseName)
302319
.build();
303320

304-
// TODO: put databaseName in metadata
305-
return get(databaseStub.getDatabaseCallable().futureCall(request));
321+
GrpcCallContext context = GrpcCallContext.createDefault()
322+
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
323+
return get(databaseStub.getDatabaseCallable().futureCall(request, context));
306324
}
307325

308326
@Override
309327
public List<String> getDatabaseDdl(String databaseName) throws SpannerException {
310328
GetDatabaseDdlRequest request =
311329
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();
312330

313-
// TODO: put databaseName in metadata
314-
return get(databaseStub.getDatabaseDdlCallable().futureCall(request))
331+
GrpcCallContext context = GrpcCallContext.createDefault()
332+
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
333+
return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context))
315334
.getStatementsList();
316335
}
317336

318337
@Override
319338
public Operation getOperation(String name) throws SpannerException {
320339
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
321-
// TODO: put name in metadata
322-
return get(databaseStub.getOperationsStub().getOperationCallable().futureCall(request));
340+
GrpcCallContext context = GrpcCallContext.createDefault()
341+
.withExtraHeaders(metadataProvider.newExtraHeaders(name, projectName));
342+
return get(databaseStub.getOperationsStub().getOperationCallable()
343+
.futureCall(request, context));
323344
}
324345

325346
@Override
326347
public Session createSession(String databaseName, @Nullable Map<String, String> labels,
327348
@Nullable Map<Option, ?> options) throws SpannerException {
328-
throw new UnsupportedOperationException("Not implemented yet.");
349+
CreateSessionRequest.Builder requestBuilder =
350+
CreateSessionRequest.newBuilder().setDatabase(databaseName);
351+
if (labels != null && !labels.isEmpty()) {
352+
Session.Builder session = Session.newBuilder().putAllLabels(labels);
353+
requestBuilder.setSession(session);
354+
}
355+
CreateSessionRequest request = requestBuilder.build();
356+
GrpcCallContext context = GrpcCallContext.createDefault()
357+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
358+
.withExtraHeaders(metadataProvider.newExtraHeaders(databaseName, projectName));
359+
return get(stub.createSessionCallable().futureCall(request, context));
329360
}
330361

331362
@Override
332363
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
333364
throws SpannerException {
334-
throw new UnsupportedOperationException("Not implemented yet.");
365+
DeleteSessionRequest request =
366+
DeleteSessionRequest.newBuilder().setName(sessionName).build();
367+
GrpcCallContext context = GrpcCallContext.createDefault()
368+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
369+
.withExtraHeaders(metadataProvider.newExtraHeaders(sessionName, projectName));
370+
get(stub.deleteSessionCallable().futureCall(request, context));
335371
}
336372

337373
@Override
338374
public StreamingCall read(
339375
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
340-
throw new UnsupportedOperationException("Not implemented yet.");
376+
GrpcCallContext context = GrpcCallContext.createDefault()
377+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
378+
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
379+
throw new UnsupportedOperationException("not implemented yet");
341380
}
342381

343382
@Override
@@ -349,33 +388,47 @@ public StreamingCall executeQuery(
349388
@Override
350389
public Transaction beginTransaction(
351390
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
352-
throw new UnsupportedOperationException("Not implemented yet.");
391+
GrpcCallContext context = GrpcCallContext.createDefault()
392+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
393+
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
394+
return get(stub.beginTransactionCallable().futureCall(request, context));
353395
}
354396

355397
@Override
356398
public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> options)
357399
throws SpannerException {
358-
throw new UnsupportedOperationException("Not implemented yet.");
400+
GrpcCallContext context = GrpcCallContext.createDefault()
401+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
402+
.withExtraHeaders(
403+
metadataProvider.newExtraHeaders(commitRequest.getSession(), projectName));
404+
return get(stub.commitCallable().futureCall(commitRequest, context));
359405
}
360406

361407
@Override
362408
public void rollback(RollbackRequest request, @Nullable Map<Option, ?> options)
363409
throws SpannerException {
364-
throw new UnsupportedOperationException("Not implemented yet.");
410+
GrpcCallContext context = GrpcCallContext.createDefault()
411+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
412+
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
413+
get(stub.rollbackCallable().futureCall(request, context));
365414
}
366415

367416
@Override
368417
public PartitionResponse partitionQuery(
369418
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
370-
throw new UnsupportedOperationException("Not implemented yet.");
419+
GrpcCallContext context = GrpcCallContext.createDefault()
420+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
421+
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
422+
return get(stub.partitionQueryCallable().futureCall(request, context));
371423
}
372424

373425
@Override
374426
public PartitionResponse partitionRead(
375427
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
376-
// TODO(pongad): Figure out metadata
377-
// TODO(pongad): Figure out channel affinity
378-
return get(stub.partitionReadCallable().futureCall(request));
428+
GrpcCallContext context = GrpcCallContext.createDefault()
429+
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue())
430+
.withExtraHeaders(metadataProvider.newExtraHeaders(request.getSession(), projectName));
431+
return get(stub.partitionReadCallable().futureCall(request, context));
379432
}
380433

381434
/** Gets the result of an async RPC call, handling any exceptions encountered. */

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.google.common.collect.ImmutableMap;
1919
import io.grpc.Metadata;
2020
import io.grpc.Metadata.Key;
21+
import java.util.Arrays;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.regex.Matcher;
2325
import java.util.regex.Pattern;
@@ -27,15 +29,15 @@
2729
*/
2830
class SpannerMetadataProvider {
2931
private final Map<Metadata.Key<String>, String> headers;
30-
private final Key<String> resourceHeaderKey;
32+
private final String resourceHeaderKey;
3133

3234
private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
3335
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
3436
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
3537
};
3638

3739
private SpannerMetadataProvider(Map<String, String> headers, String resourceHeaderKey) {
38-
this.resourceHeaderKey = Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER);
40+
this.resourceHeaderKey = resourceHeaderKey;
3941
this.headers = constructHeadersAsMetadata(headers);
4042
}
4143

@@ -50,11 +52,21 @@ Metadata newMetadata(String resourceTokenTemplate, String defaultResourceToken)
5052
}
5153

5254
metadata.put(
53-
resourceHeaderKey, getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken));
55+
Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER),
56+
getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken));
5457

5558
return metadata;
5659
}
5760

61+
Map<String, List<String>> newExtraHeaders(String resourceTokenTemplate, String defaultResourceToken) {
62+
return ImmutableMap.<String, List<String>>builder()
63+
.put(
64+
resourceHeaderKey,
65+
Arrays.asList(getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)))
66+
.build();
67+
68+
}
69+
5870
private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
5971
Map<String, String> headers) {
6072
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
<site.installationModule>google-cloud</site.installationModule>
139139
<bom.version>0.43.1-alpha-SNAPSHOT</bom.version><!-- {x-version-update:google-cloud-pom:current} -->
140140
<api-client.version>1.23.0</api-client.version>
141-
<gax.version>1.23.0</gax.version>
141+
<gax.version>1.24.0</gax.version>
142142
<google.auth.version>0.9.0</google.auth.version>
143143
<grpc.version>1.10.1</grpc.version>
144144
<nettyssl.version>2.0.7.Final</nettyssl.version>

0 commit comments

Comments
 (0)