-
Notifications
You must be signed in to change notification settings - Fork 137
fix: retry pdml transaction on EOS internal error #360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
8a1d690
d3683b0
88f4bb5
b145058
b8da6c1
3be1346
5edd3eb
1ff9092
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |||
|
|
||||
| import com.google.api.gax.grpc.GrpcStatusCode; | ||||
| import com.google.api.gax.rpc.DeadlineExceededException; | ||||
| import com.google.api.gax.rpc.InternalException; | ||||
| import com.google.api.gax.rpc.ServerStream; | ||||
| import com.google.api.gax.rpc.UnavailableException; | ||||
| import com.google.cloud.spanner.SessionImpl.SessionTransaction; | ||||
|
|
@@ -55,23 +56,6 @@ class PartitionedDMLTransaction implements SessionTransaction { | |||
| this.rpc = rpc; | ||||
| } | ||||
|
|
||||
| private ByteString initTransaction() { | ||||
| final BeginTransactionRequest request = | ||||
| BeginTransactionRequest.newBuilder() | ||||
| .setSession(session.getName()) | ||||
| .setOptions( | ||||
| TransactionOptions.newBuilder() | ||||
| .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) | ||||
| .build(); | ||||
| Transaction txn = rpc.beginTransaction(request, session.getOptions()); | ||||
| if (txn.getId().isEmpty()) { | ||||
| throw SpannerExceptionFactory.newSpannerException( | ||||
| ErrorCode.INTERNAL, | ||||
| "Failed to init transaction, missing transaction id\n" + session.getName()); | ||||
| } | ||||
| return txn.getId(); | ||||
| } | ||||
|
|
||||
| /** | ||||
| * Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the | ||||
| * transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the | ||||
|
|
@@ -127,20 +111,24 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration | |||
| } | ||||
| } | ||||
| break; | ||||
| } catch (UnavailableException e) { | ||||
| } catch (UnavailableException | InternalException e) { | ||||
|
||||
| // Retry the stream in the same transaction if the stream breaks with | ||||
| // UnavailableException and we have a resume token. Otherwise, we just retry the | ||||
| // entire transaction. | ||||
| if (!ByteString.EMPTY.equals(resumeToken)) { | ||||
| log.log( | ||||
| Level.FINER, | ||||
| "Retrying PartitionedDml stream using resume token '" | ||||
| + resumeToken.toStringUtf8() | ||||
| + "' because of broken stream", | ||||
| e); | ||||
| if (shouldResumeOrRestartTransaction(e)) { | ||||
| if (!ByteString.EMPTY.equals(resumeToken)) { | ||||
| log.log( | ||||
| Level.FINER, | ||||
| "Retrying PartitionedDml stream using resume token '" | ||||
| + resumeToken.toStringUtf8() | ||||
| + "' because of broken stream", | ||||
| e); | ||||
| } else { | ||||
| throw new com.google.api.gax.rpc.AbortedException( | ||||
| e, GrpcStatusCode.of(Code.ABORTED), true); | ||||
| } | ||||
| } else { | ||||
| throw new com.google.api.gax.rpc.AbortedException( | ||||
| e, GrpcStatusCode.of(Code.ABORTED), true); | ||||
| throw e; | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
@@ -174,4 +162,27 @@ public void invalidate() { | |||
| // No-op method needed to implement SessionTransaction interface. | ||||
| @Override | ||||
| public void setSpan(Span span) {} | ||||
|
|
||||
| private ByteString initTransaction() { | ||||
| final BeginTransactionRequest request = | ||||
| BeginTransactionRequest.newBuilder() | ||||
| .setSession(session.getName()) | ||||
| .setOptions( | ||||
| TransactionOptions.newBuilder() | ||||
| .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) | ||||
| .build(); | ||||
| Transaction txn = rpc.beginTransaction(request, session.getOptions()); | ||||
| if (txn.getId().isEmpty()) { | ||||
| throw SpannerExceptionFactory.newSpannerException( | ||||
| ErrorCode.INTERNAL, | ||||
| "Failed to init transaction, missing transaction id\n" + session.getName()); | ||||
| } | ||||
| return txn.getId(); | ||||
| } | ||||
|
|
||||
| private boolean shouldResumeOrRestartTransaction(Exception e) { | ||||
| return e instanceof UnavailableException | ||||
| || (e instanceof InternalException | ||||
| && e.getMessage().contains("Received unexpected EOS on DATA frame from server")); | ||||
|
||||
| private static boolean isRetryable(ErrorCode code, @Nullable Throwable cause) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was just moved down (after the public methods).