Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);

try (IScope s = tracer.withSpan(span)) {
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
return spanner
.getTransactionRetryHelper()
.runTxWithRetriesOnAborted(
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private static String nextDatabaseClientId(DatabaseId databaseId) {

private final DatabaseAdminClient dbAdminClient;
private final InstanceAdminClient instanceClient;
private final TransactionRetryHelper transactionRetryHelper;

/**
* Exception class used to track the stack trace at the point when a Spanner instance is closed.
Expand Down Expand Up @@ -145,6 +146,8 @@ static final class ClosedException extends RuntimeException {
this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc);
this.instanceClient =
new InstanceAdminClientImpl(options.getProjectId(), gapicRpc, dbAdminClient);
this.transactionRetryHelper =
new TransactionRetryHelper(options.getDefaultTransactionRetrySettings());
logSpannerOptions(options);
}

Expand Down Expand Up @@ -200,6 +203,10 @@ SpannerRpc getRpc() {
return gapicRpc;
}

TransactionRetryHelper getTransactionRetryHelper() {
return transactionRetryHelper;
}

/** Returns the default setting for prefetchChunks of this {@link SpannerImpl} instance. */
int getDefaultPrefetchChunks() {
return getOptions().getPrefetchChunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.SpannerSettings;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand All @@ -70,6 +71,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
Expand Down Expand Up @@ -204,6 +206,25 @@ public static GcpChannelPoolOptions createDefaultDynamicChannelPoolOptions() {
.build();
}

/**
* Use the same {@link RetrySettings} for retrying an aborted transaction as for retrying a {@link
* RollbackRequest}. The {@link RollbackRequest} automatically uses the default retry settings
* defined for the {@link SpannerStub}. By referencing these settings, the retry settings for
* retrying aborted transactions will also automatically be updated if the default retry settings
* are updated.
*
* <p>A read/write transaction should not time out while retrying. The total timeout of the retry
* settings is therefore set to 24 hours and there is no max attempts value.
*
* <p>These default {@link RetrySettings} are only used if no retry information is returned by the
* {@link AbortedException}.
*/
public static final RetrySettings DEFAULT_TRANSACTION_RETRY_SETTINGS =
SpannerStubSettings.newBuilder().rollbackSettings().getRetrySettings().toBuilder()
.setTotalTimeoutDuration(Duration.ofHours(24L))
.setMaxAttempts(0)
.build();

private final TransportChannelProvider channelProvider;
private final ChannelEndpointCacheFactory channelEndpointCacheFactory;

Expand Down Expand Up @@ -264,6 +285,7 @@ public static GcpChannelPoolOptions createDefaultDynamicChannelPoolOptions() {
private final boolean enableEndToEndTracing;
private final String monitoringHost;
private final TransactionOptions defaultTransactionOptions;
private final RetrySettings defaultTransactionRetrySettings;
private final RequestOptions.ClientContext clientContext;

enum TracingFramework {
Expand Down Expand Up @@ -934,6 +956,7 @@ protected SpannerOptions(Builder builder) {
enableEndToEndTracing = builder.enableEndToEndTracing;
monitoringHost = builder.monitoringHost;
defaultTransactionOptions = builder.defaultTransactionOptions;
defaultTransactionRetrySettings = builder.defaultTransactionRetrySettings;
clientContext = builder.clientContext;
}

Expand Down Expand Up @@ -1196,6 +1219,8 @@ public static class Builder
private String experimentalHost = null;
private boolean usePlainText = false;
private TransactionOptions defaultTransactionOptions = TransactionOptions.getDefaultInstance();
private RetrySettings defaultTransactionRetrySettings =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line will cause a compilation error. The static field DEFAULT_TRANSACTION_RETRY_SETTINGS was moved from SpannerRetryHelper (now TransactionRetryHelper) to SpannerOptions. You should reference it directly from SpannerOptions.

Suggested change
private RetrySettings defaultTransactionRetrySettings =
private RetrySettings defaultTransactionRetrySettings = DEFAULT_TRANSACTION_RETRY_SETTINGS;

TransactionRetryHelper.DEFAULT_TRANSACTION_RETRY_SETTINGS;
private RequestOptions.ClientContext clientContext;

private static String createCustomClientLibToken(String token) {
Expand Down Expand Up @@ -1302,6 +1327,7 @@ protected Builder() {
this.enableEndToEndTracing = options.enableEndToEndTracing;
this.monitoringHost = options.monitoringHost;
this.defaultTransactionOptions = options.defaultTransactionOptions;
this.defaultTransactionRetrySettings = options.defaultTransactionRetrySettings;
this.clientContext = options.clientContext;
}

Expand Down Expand Up @@ -2055,6 +2081,18 @@ public Builder setDefaultTransactionOptions(
return this;
}

/**
* Sets the default {@link RetrySettings} for all read/write transactions that are executed
* using this client. These settings are used when the client automatically retries an aborted
* read/write transaction. The default is to retry for up to 24 hours without a limit for the
* maximum number of attempts.
*/
public Builder setDefaultTransactionRetrySettings(RetrySettings retrySettings) {
Preconditions.checkNotNull(retrySettings, "RetrySettings cannot be null");
this.defaultTransactionRetrySettings = retrySettings;
return this;
}

/** Sets the default {@link RequestOptions.ClientContext} for all requests. */
public Builder setDefaultClientContext(RequestOptions.ClientContext clientContext) {
this.clientContext = clientContext;
Expand Down Expand Up @@ -2481,6 +2519,10 @@ public TransactionOptions getDefaultTransactionOptions() {
return defaultTransactionOptions;
}

public RetrySettings getDefaultTransactionRetrySettings() {
return this.defaultTransactionRetrySettings;
}

@BetaApi
public boolean isUseVirtualThreads() {
return useVirtualThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.spanner.v1.RollbackRequest;
import io.grpc.Context;
import java.time.Duration;
import java.util.concurrent.Callable;
Expand All @@ -39,50 +36,35 @@
* that uses specific settings to only retry on aborted transactions, without a timeout and without
* a cap on the number of retries.
*/
class SpannerRetryHelper {
class TransactionRetryHelper {
private final RetrySettings retrySettings;

/**
* Use the same {@link RetrySettings} for retrying an aborted transaction as for retrying a {@link
* RollbackRequest}. The {@link RollbackRequest} automatically uses the default retry settings
* defined for the {@link SpannerStub}. By referencing these settings, the retry settings for
* retrying aborted transactions will also automatically be updated if the default retry settings
* are updated.
*
* <p>A read/write transaction should not timeout while retrying. The total timeout of the retry
* settings is therefore set to 24 hours and there is no max attempts value.
*
* <p>These default {@link RetrySettings} are only used if no retry information is returned by the
* {@link AbortedException}.
*/
@VisibleForTesting
static final RetrySettings txRetrySettings =
SpannerStubSettings.newBuilder().rollbackSettings().getRetrySettings().toBuilder()
.setTotalTimeoutDuration(Duration.ofHours(24L))
.setMaxAttempts(0)
.build();
TransactionRetryHelper(RetrySettings retrySettings) {
this.retrySettings = retrySettings;
}

/** Executes the {@link Callable} and retries if it fails with an {@link AbortedException}. */
static <T> T runTxWithRetriesOnAborted(Callable<T> callable) {
<T> T runTxWithRetriesOnAborted(Callable<T> callable) {
return runTxWithRetriesOnAborted(callable, DefaultErrorHandler.INSTANCE);
}

static <T> T runTxWithRetriesOnAborted(Callable<T> callable, ErrorHandler errorHandler) {
<T> T runTxWithRetriesOnAborted(Callable<T> callable, ErrorHandler errorHandler) {
return runTxWithRetriesOnAborted(
callable, errorHandler, txRetrySettings, NanoClock.getDefaultClock());
callable, errorHandler, this.retrySettings, NanoClock.getDefaultClock());
}

/**
* Executes the {@link Callable} and retries if it fails with an {@link AbortedException} using
* the specific {@link RetrySettings}.
*/
@VisibleForTesting
static <T> T runTxWithRetriesOnAborted(
<T> T runTxWithRetriesOnAborted(
Callable<T> callable, RetrySettings retrySettings, ApiClock clock) {
return runTxWithRetriesOnAborted(callable, DefaultErrorHandler.INSTANCE, retrySettings, clock);
}

@VisibleForTesting
static <T> T runTxWithRetriesOnAborted(
<T> T runTxWithRetriesOnAborted(
Callable<T> callable,
ErrorHandler errorHandler,
RetrySettings retrySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,10 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
throw e;
}
};
return SpannerRetryHelper.runTxWithRetriesOnAborted(retryCallable, session.getErrorHandler());
return session
.getSpanner()
.getTransactionRetryHelper()
.runTxWithRetriesOnAborted(retryCallable, session.getErrorHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class SpannerRetryHelperTest {
public class TransactionRetryHelperTest {
private static class FakeClock implements ApiClock {
private long currentTime;

Expand All @@ -58,6 +58,9 @@ public long millisTime() {
}
}

private final TransactionRetryHelper retryHelper =
new TransactionRetryHelper(SpannerOptions.DEFAULT_TRANSACTION_RETRY_SETTINGS);

@Test
public void testRetryDoesNotTimeoutAfterTenMinutes() {
final FakeClock clock = new FakeClock();
Expand All @@ -72,8 +75,9 @@ public void testRetryDoesNotTimeoutAfterTenMinutes() {
};
assertEquals(
2,
SpannerRetryHelper.runTxWithRetriesOnAborted(
callable, SpannerRetryHelper.txRetrySettings, clock)
retryHelper
.runTxWithRetriesOnAborted(
callable, SpannerOptions.DEFAULT_TRANSACTION_RETRY_SETTINGS, clock)
.intValue());
}

Expand All @@ -93,8 +97,8 @@ public void testRetryDoesFailAfterMoreThanOneDay() {
assertThrows(
SpannerException.class,
() ->
SpannerRetryHelper.runTxWithRetriesOnAborted(
callable, SpannerRetryHelper.txRetrySettings, clock));
retryHelper.runTxWithRetriesOnAborted(
callable, SpannerOptions.DEFAULT_TRANSACTION_RETRY_SETTINGS, clock));
assertEquals(ErrorCode.ABORTED, e.getErrorCode());
assertEquals(1, attempts.get());
}
Expand All @@ -118,8 +122,7 @@ public void testCancelledContext() {
SpannerException e =
assertThrows(
SpannerException.class,
() ->
withCancellation.run(() -> SpannerRetryHelper.runTxWithRetriesOnAborted(callable)));
() -> withCancellation.run(() -> retryHelper.runTxWithRetriesOnAborted(callable)));
assertEquals(ErrorCode.CANCELLED, e.getErrorCode());
}

Expand All @@ -135,14 +138,14 @@ public void testTimedOutContext() {
SpannerException e =
assertThrows(
SpannerException.class,
() -> withDeadline.run(() -> SpannerRetryHelper.runTxWithRetriesOnAborted(callable)));
() -> withDeadline.run(() -> retryHelper.runTxWithRetriesOnAborted(callable)));
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
}

@Test
public void noException() {
Callable<Integer> callable = () -> 1 + 1;
assertThat(SpannerRetryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
assertThat(retryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
}

@Test(expected = IllegalStateException.class)
Expand All @@ -151,7 +154,7 @@ public void propagateUncheckedException() {
() -> {
throw new IllegalStateException("test");
};
SpannerRetryHelper.runTxWithRetriesOnAborted(callable);
retryHelper.runTxWithRetriesOnAborted(callable);
}

@Test
Expand All @@ -164,7 +167,7 @@ public void retryOnAborted() {
}
return 1 + 1;
};
assertThat(SpannerRetryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
assertThat(retryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
}

@Test
Expand All @@ -177,7 +180,7 @@ public void retryMultipleTimesOnAborted() {
}
return 1 + 1;
};
assertThat(SpannerRetryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
assertThat(retryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
}

@Test(expected = IllegalStateException.class)
Expand All @@ -190,7 +193,7 @@ public void retryOnAbortedAndThenPropagateUnchecked() {
}
throw new IllegalStateException("test");
};
SpannerRetryHelper.runTxWithRetriesOnAborted(callable);
retryHelper.runTxWithRetriesOnAborted(callable);
}

@Test
Expand Down Expand Up @@ -238,7 +241,7 @@ public void testExceptionWithRetryInfo() {
// The following call should take at least 100ms, as that is the retry delay specified in the
// retry info of the exception.
Stopwatch watch = Stopwatch.createStarted();
assertThat(SpannerRetryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
assertThat(retryHelper.runTxWithRetriesOnAborted(callable)).isEqualTo(2);
long elapsed = watch.elapsed(TimeUnit.MILLISECONDS);
// Allow 1ms difference as that should be the accuracy of the sleep method.
assertThat(elapsed).isAtLeast(RETRY_DELAY_MILLIS - 1);
Expand Down
Loading