Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:

- name: Start containerized server and dependencies
env:
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-151.0
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand All @@ -94,9 +94,8 @@ jobs:
--search-attribute CustomBoolField=Bool \
--dynamic-config-value system.enableActivityEagerExecution=true \
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
--dynamic-config-value component.nexusoperations.recordCancelRequestCompletionEvents=true \
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
--dynamic-config-value history.enableRequestIdRefLinks=true &
--dynamic-config-value history.enableRequestIdRefLinks=true \
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
sleep 10s

- name: Run unit tests
Expand Down Expand Up @@ -186,4 +185,4 @@ jobs:
name: Build native test server
uses: ./.github/workflows/build-native-image.yml
with:
ref: ${{ github.event.pull_request.head.sha }}
ref: ${{ github.event.pull_request.head.sha }}
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
input.getHeaders().forEach((k, v) -> attributes.putNexusHeader(k.toLowerCase(), v));
attributes.setScheduleToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
attributes.setScheduleToStartTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToStartTimeout()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: ProtobufTimeUtils.toProtoDuration will return a zero duration here not a null duration, sometimes the server treats a zero duration different then a empty duration so we need to be careful that is not the case here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server treats zero and null the same here.

attributes.setStartToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getStartToCloseTimeout()));

@Nullable
UserMetadata userMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static NexusOperationOptions getDefaultInstance() {

public static final class Builder {
private Duration scheduleToCloseTimeout;
private Duration scheduleToStartTimeout;
private Duration startToCloseTimeout;
private NexusOperationCancellationType cancellationType;
private String summary;

Expand All @@ -46,6 +48,45 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout(
return this;
}

/**
* Sets the schedule to start timeout for the Nexus operation.
*
* <p>Maximum time to wait for the operation to be started (or completed if synchronous) by the
* handler. If the operation is not started within this timeout, it will fail with
* TIMEOUT_TYPE_SCHEDULE_TO_START.
*
* <p>Requires Temporal Server 1.31.0 or later.
*
* @param scheduleToStartTimeout the schedule to start timeout for the Nexus operation
* @return this
*/
@Experimental
public NexusOperationOptions.Builder setScheduleToStartTimeout(
Duration scheduleToStartTimeout) {
this.scheduleToStartTimeout = scheduleToStartTimeout;
return this;
}

/**
* Sets the start to close timeout for the Nexus operation.
*
* <p>Maximum time to wait for an asynchronous operation to complete after it has been started.
* If the operation does not complete within this timeout after starting, it will fail with
* TIMEOUT_TYPE_START_TO_CLOSE.
*
* <p>Only applies to asynchronous operations. Synchronous operations ignore this timeout.
*
* <p>Requires Temporal Server 1.31.0 or later.
*
* @param startToCloseTimeout the start to close timeout for the Nexus operation
* @return this
*/
@Experimental
public NexusOperationOptions.Builder setStartToCloseTimeout(Duration startToCloseTimeout) {
this.startToCloseTimeout = startToCloseTimeout;
return this;
}

/**
* Sets the cancellation type for the Nexus operation. Defaults to WAIT_COMPLETED.
*
Expand Down Expand Up @@ -78,12 +119,19 @@ private Builder(NexusOperationOptions options) {
return;
}
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
this.scheduleToStartTimeout = options.getScheduleToStartTimeout();
this.startToCloseTimeout = options.getStartToCloseTimeout();
this.cancellationType = options.getCancellationType();
this.summary = options.getSummary();
}

public NexusOperationOptions build() {
return new NexusOperationOptions(scheduleToCloseTimeout, cancellationType, summary);
return new NexusOperationOptions(
scheduleToCloseTimeout,
scheduleToStartTimeout,
startToCloseTimeout,
cancellationType,
summary);
}

public NexusOperationOptions.Builder mergeNexusOperationOptions(
Expand All @@ -95,6 +143,14 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
(override.scheduleToCloseTimeout == null)
? this.scheduleToCloseTimeout
: override.scheduleToCloseTimeout;
this.scheduleToStartTimeout =
(override.scheduleToStartTimeout == null)
? this.scheduleToStartTimeout
: override.scheduleToStartTimeout;
this.startToCloseTimeout =
(override.startToCloseTimeout == null)
? this.startToCloseTimeout
: override.startToCloseTimeout;
this.cancellationType =
(override.cancellationType == null) ? this.cancellationType : override.cancellationType;
this.summary = (override.summary == null) ? this.summary : override.summary;
Expand All @@ -104,9 +160,13 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(

private NexusOperationOptions(
Duration scheduleToCloseTimeout,
Duration scheduleToStartTimeout,
Duration startToCloseTimeout,
NexusOperationCancellationType cancellationType,
String summary) {
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.scheduleToStartTimeout = scheduleToStartTimeout;
this.startToCloseTimeout = startToCloseTimeout;
this.cancellationType = cancellationType;
this.summary = summary;
}
Expand All @@ -116,13 +176,25 @@ public NexusOperationOptions.Builder toBuilder() {
}

private final Duration scheduleToCloseTimeout;
private final Duration scheduleToStartTimeout;
private final Duration startToCloseTimeout;
private final NexusOperationCancellationType cancellationType;
private final String summary;

public Duration getScheduleToCloseTimeout() {
return scheduleToCloseTimeout;
}

@Experimental
public Duration getScheduleToStartTimeout() {
return scheduleToStartTimeout;
}

@Experimental
public Duration getStartToCloseTimeout() {
return startToCloseTimeout;
}

public NexusOperationCancellationType getCancellationType() {
return cancellationType;
}
Expand All @@ -138,20 +210,31 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
NexusOperationOptions that = (NexusOperationOptions) o;
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
&& Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout)
&& Objects.equals(startToCloseTimeout, that.startToCloseTimeout)
&& Objects.equals(cancellationType, that.cancellationType)
&& Objects.equals(summary, that.summary);
}

@Override
public int hashCode() {
return Objects.hash(scheduleToCloseTimeout, cancellationType, summary);
return Objects.hash(
scheduleToCloseTimeout,
scheduleToStartTimeout,
startToCloseTimeout,
cancellationType,
summary);
}

@Override
public String toString() {
return "NexusOperationOptions{"
+ "scheduleToCloseTimeout="
+ scheduleToCloseTimeout
+ ", scheduleToStartTimeout="
+ scheduleToStartTimeout
+ ", startToCloseTimeout="
+ startToCloseTimeout
+ ", cancellationType="
+ cancellationType
+ ", summary='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.*;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.failure.TerminatedFailure;
import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
import io.temporal.internal.retryer.GrpcMessageTooLargeException;
import io.temporal.internal.worker.PollerOptions;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void activityStartTooLarge() {

WorkflowFailedException e =
assertThrows(WorkflowFailedException.class, () -> workflow.execute(""));
assertTrue(e.getCause() instanceof TimeoutFailure);
assertTrue(e.getCause() instanceof TerminatedFailure);

String workflowId = WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId();
assertTrue(
Expand All @@ -83,7 +83,7 @@ public void activityStartTooLarge() {
workflowId, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
assertEquals(1, events.size());
assertEquals(
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE,
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
events.get(0).getWorkflowTaskFailedEventAttributes().getCause());
}

Expand All @@ -97,14 +97,14 @@ public void workflowFailureTooLarge() {
WorkflowFailedException e =
assertThrows(WorkflowFailedException.class, () -> workflow.execute(""));

assertTrue(e.getCause() instanceof TimeoutFailure);
assertTrue(e.getCause() instanceof TerminatedFailure);
String workflowId = WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId();
List<HistoryEvent> events =
failureWorkflowRule.getHistoryEvents(
workflowId, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
assertEquals(1, events.size());
assertEquals(
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE,
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
events.get(0).getWorkflowTaskFailedEventAttributes().getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.testing.internal.ExternalServiceTestConfigurator;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerOptions;
Expand Down Expand Up @@ -294,12 +293,7 @@ public void scheduleToStartTimeout(boolean local) throws InterruptedException {

MatcherAssert.assertThat(
activityFailure.getMessage(), CoreMatchers.containsString("Activity task timed out"));
if (ExternalServiceTestConfigurator.isUseExternalService() && !local) {
// https://github.com/temporalio/temporal/issues/3667
assertEquals(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, activityFailure.getRetryState());
} else {
assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());
}
assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());

assertTrue(activityFailure.getCause() instanceof TimeoutFailure);
assertEquals(
Expand Down Expand Up @@ -567,12 +561,7 @@ public void scheduleToCloseTimeout_timingOutActivity(boolean local) {
assertTrue(e.getCause() instanceof ActivityFailure);
ActivityFailure activityFailure = (ActivityFailure) e.getCause();

if (ExternalServiceTestConfigurator.isUseExternalService() && !local) {
// https://github.com/temporalio/temporal/issues/3667
assertEquals(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, activityFailure.getRetryState());
} else {
assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());
}
assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());

MatcherAssert.assertThat(
activityFailure.getMessage(), CoreMatchers.containsString("Activity task timed out"));
Expand Down Expand Up @@ -618,12 +607,7 @@ public void scheduleToCloseTimeout_failing_timingOutActivity(boolean local) {
assertTrue(e.getCause() instanceof ActivityFailure);
ActivityFailure activityFailure = (ActivityFailure) e.getCause();

if (ExternalServiceTestConfigurator.isUseExternalService() && !local) {
// https://github.com/temporalio/temporal/issues/3667
assertEquals(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, activityFailure.getRetryState());
} else {
assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());
}
assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());

MatcherAssert.assertThat(
activityFailure.getMessage(), CoreMatchers.containsString("Activity task timed out"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public void cancelAsyncOperationAbandon() {
// Cancel before command is sent
runCancelBeforeSentTest(NexusOperationCancellationType.ABANDON);

// For ABANDON, the handler workflow may start even in the cancel-before-sent case,
// which can set opStarted and handlerFinished. Clear them before the after-start test.
opStarted.clearSignal();
handlerFinished.clearSignal();

// Cancel after operation is started
WorkflowStub stub =
testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("TestNexusOperationCancellationType");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class HeaderTest {
public void testOperationHeaders() {
TestWorkflow workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow.class);
Map<String, String> headers = workflowStub.execute(testWorkflowRule.getTaskQueue());
// Operation-timeout is set because the schedule-to-close timeout is capped by workflow run
// timeout, which is set by
// default for tests.
Assert.assertTrue(headers.containsKey("operation-timeout"));
Assert.assertTrue(headers.containsKey("request-timeout"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static class TestNexus implements TestUpdatedWorkflow {
public String execute(boolean fail) {
NexusOperationOptions options =
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.setScheduleToCloseTimeout(Duration.ofSeconds(3))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ public void failWhenUpdateNamesDoNotMatch() {
}
}

@SuppressWarnings(
"deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
@Test
public void failServerSideWhenStartIsInvalid() {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
Expand Down
Loading
Loading