Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,88 @@ public ApiFuture<Row> readRowAsync(String tableId, ByteString rowKey, @Nullable
if (filter != null) {
query = query.filter(filter);
}
return readRowsCallable().first().futureCall(query);
return readRowCallable().futureCall(query);
}

/**
* Reads a single row. The returned callable object allows for customization of api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .rowKey("[KEY]")
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
*
* // Synchronous invocation
* try {
* Row row = bigtableDataClient.readRowCallable().call(query);
* if (row == null) {
* System.out.println("Row not found");
* }
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Asynchronous invocation
* ApiFuture<Row> rowFuture = bigtableDataClient.readRowCallable().futureCall(query);
*
* ApiFutures.addCallback(rowFuture, new ApiFutureCallback<Row>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(Row row) {
* if (row == null) {
* System.out.println("Row not found");
* }
* }
* }, MoreExecutors.directExecutor());
* }
* }</pre>
*
* @see UnaryCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
public UnaryCallable<Query, Row> readRowCallable() {
return stub.readRowCallable();
}

/**
* Reads a single row. This callable allows for customization of the logical representation of a
* row. It's meant for advanced use cases.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .rowKey("[KEY]")
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
*
* // Synchronous invocation
* CustomRow row = bigtableDataClient.readRowCallable(new CustomRowAdapter()).call(query));
* // Do something with row
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
public <RowT> UnaryCallable<Query, RowT> readRowCallable(RowAdapter<RowT> rowAdapter) {
return stub.createReadRowCallable(rowAdapter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.internal.RowSetUtil;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -264,4 +266,34 @@ private static ByteString wrapKey(String key) {
}
return ByteString.copyFromUtf8(key);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Query query = (Query) o;
return Objects.equal(tableId, query.tableId)
&& Objects.equal(builder.build(), query.builder.build());
}

@Override
public int hashCode() {
return Objects.hashCode(tableId, builder.build());
}

@Override
public String toString() {
ReadRowsRequest request = builder.build();

return MoreObjects.toStringHelper(this)
.add("tableId", tableId)
.add("keys", request.getRows().getRowKeysList())
.add("ranges", request.getRows().getRowRangesList())
.add("filter", request.getFilter())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final RequestContext requestContext;

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -151,6 +152,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
RequestContext.create(settings.getInstanceName(), settings.getAppProfileId());

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand All @@ -162,7 +164,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
// <editor-fold desc="Callable creators">

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
* Creates a callable chain to handle streaming ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
Expand All @@ -176,6 +178,48 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
*/
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(settings.readRowsSettings(), rowAdapter);
}

/**
* Creates a callable chain to handle point ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(
ServerStreamingCallSettings.<Query, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter)
.first();
}

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*/
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<Query, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
Expand All @@ -185,9 +229,9 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
.build();

// Retry logic is split into 2 parts to workaround a rare edge case described in
Expand Down Expand Up @@ -356,10 +400,16 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
// </editor-fold>

// <editor-fold desc="Callable accessors">
/** Returns a streaming read rows callable */
public ServerStreamingCallable<Query, Row> readRowsCallable() {
return readRowsCallable;
}

/** Return a point read callable */
public UnaryCallable<Query, Row> readRowCallable() {
return readRowCallable;
}

public UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable() {
return sampleRowKeysCallable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
private final UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings;
private final UnaryCallSettings<RowMutation, Void> mutateRowSettings;
private final BatchingCallSettings<RowMutation, Void> bulkMutateRowsSettings;
Expand All @@ -120,11 +121,22 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS

private EnhancedBigtableStubSettings(Builder builder) {
super(builder);

// Since point reads & streaming reads share the same base callable that converts grpc errors
// into ApiExceptions, they must have the same retry codes.
Preconditions.checkState(
builder
.readRowSettings
.getRetryableCodes()
.equals(builder.readRowsSettings.getRetryableCodes()),
"Single ReadRow retry codes must match ReadRows retry codes");

instanceName = builder.instanceName;
appProfileId = builder.appProfileId;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
readRowSettings = builder.readRowSettings.build();
sampleRowKeysSettings = builder.sampleRowKeysSettings.build();
mutateRowSettings = builder.mutateRowSettings.build();
bulkMutateRowsSettings = builder.bulkMutateRowsSettings.build();
Expand Down Expand Up @@ -163,6 +175,11 @@ public UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings() {
return sampleRowKeysSettings;
}

/** Returns the object with the settings used for point reads via ReadRows. */
public UnaryCallSettings<Query, Row> readRowSettings() {
return readRowSettings;
}

/** Returns the object with the settings used for calls to MutateRow. */
public UnaryCallSettings<RowMutation, Void> mutateRowSettings() {
return mutateRowSettings;
Expand Down Expand Up @@ -200,6 +217,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
private final UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings;
private final UnaryCallSettings.Builder<RowMutation, Void> mutateRowSettings;
private final BatchingCallSettings.Builder<RowMutation, Void> bulkMutateRowsSettings;
Expand Down Expand Up @@ -234,18 +252,27 @@ private Builder() {

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
/* TODO: copy timeouts, retryCodes & retrySettings from baseSettings.readRows once it exists in GAPIC */
readRowsSettings
.setRetryableCodes(DEFAULT_RETRY_CODES)
.setRetrySettings(
DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofHours(1)).build())
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
.setRetrySettings(baseDefaults.readRowsSettings().getRetrySettings())
.setIdleTimeout(Duration.ofMinutes(5));

// Point reads should use same defaults as streaming reads, but with a shorter timeout
readRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
readRowSettings
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
.setRetrySettings(
baseDefaults
.readRowsSettings()
.getRetrySettings()
.toBuilder()
.setTotalTimeout(DEFAULT_RETRY_SETTINGS.getTotalTimeout())
.build());

sampleRowKeysSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
/* TODO: copy retryCodes & retrySettings from baseSettings.sampleRowKeysSettings once it exists in GAPIC */
sampleRowKeysSettings
.setRetryableCodes(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED)
.setRetrySettings(DEFAULT_RETRY_SETTINGS);
.setRetryableCodes(baseDefaults.sampleRowKeysSettings().getRetryableCodes())
.setRetrySettings(baseDefaults.sampleRowKeysSettings().getRetrySettings());

mutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
copyRetrySettings(baseDefaults.mutateRowSettings(), mutateRowSettings);
Expand Down Expand Up @@ -282,6 +309,7 @@ private Builder(EnhancedBigtableStubSettings settings) {

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
readRowSettings = settings.readRowSettings.toBuilder();
sampleRowKeysSettings = settings.sampleRowKeysSettings.toBuilder();
mutateRowSettings = settings.mutateRowSettings.toBuilder();
bulkMutateRowsSettings = settings.bulkMutateRowsSettings.toBuilder();
Expand Down Expand Up @@ -339,6 +367,11 @@ public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
}

/** Returns the builder for the settings used for point reads using readRow. */
public UnaryCallSettings.Builder<Query, Row> readRowSettings() {
return readRowSettings;
}

/** Returns the builder for the settings used for calls to SampleRowKeysSettings. */
public UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings() {
return sampleRowKeysSettings;
Expand Down
Loading