Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,8 @@ public DynamicUpdateHandler getHandler() {

void sleep(Duration duration);

void sleep(Duration duration, TimerOptions options);

boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition);

void await(String reason, Supplier<Boolean> unblockCondition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void sleep(Duration duration) {
next.sleep(duration);
}

@Override
public void sleep(Duration duration, TimerOptions options) {
next.sleep(duration, options);
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
return next.await(timeout, reason, unblockCondition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,11 @@ public void sleep(Duration duration) {
newTimer(duration).get();
}

@Override
public void sleep(Duration duration, TimerOptions options) {
newTimer(duration, options).get();
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
Promise<Void> timer = newTimer(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,11 @@ public static void sleep(Duration duration) {
getWorkflowOutboundInterceptor().sleep(duration);
}

public static void sleep(Duration duration, TimerOptions options) {
assertNotReadOnly("sleep");
getWorkflowOutboundInterceptor().sleep(duration, options);
}

public static boolean isWorkflowThread() {
return WorkflowThreadMarker.isWorkflowThread();
}
Expand Down
10 changes: 10 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,21 @@ public static void sleep(Duration duration) {
WorkflowInternal.sleep(duration);
}

/** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */
public static void sleep(Duration duration, TimerOptions options) {
WorkflowInternal.sleep(duration, options);
}

/** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */
public static void sleep(long millis) {
WorkflowInternal.sleep(Duration.ofMillis(millis));
}

/** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */
public static void sleep(long millis, TimerOptions options) {
WorkflowInternal.sleep(Duration.ofMillis(millis), options);
}

/**
* Block current thread until unblockCondition is evaluated to true.
*
Expand Down
99 changes: 99 additions & 0 deletions temporal-sdk/src/test/java/io/temporal/workflow/SleepTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.temporal.workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.testUtils.HistoryUtils;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testing.internal.TracingWorkerInterceptor;
import io.temporal.workflow.shared.TestWorkflows.TestTraceWorkflow;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Rule;
import org.junit.Test;

public class SleepTest {

static final String SLEEP_SUMMARY = "sleep1";

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestSleepWorkflowImpl.class).build();

@Test
public void testSleep() {
WorkflowOptions options;
if (testWorkflowRule.isUseExternalService()) {
options = SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
} else {
options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setWorkflowRunTimeout(Duration.ofDays(1))
.build();
}
TestTraceWorkflow client =
testWorkflowRule.getWorkflowClient().newWorkflowStub(TestTraceWorkflow.class, options);
String result = client.execute();
assertEquals("testSleep", result);

// Validate that the timer summary was actually set in the history
WorkflowExecution exec = WorkflowStub.fromTyped(client).getExecution();
WorkflowExecutionHistory history =
testWorkflowRule.getWorkflowClient().fetchHistory(exec.getWorkflowId());
List<HistoryEvent> timerStartedEvents =
history.getEvents().stream()
.filter(HistoryEvent::hasTimerStartedEventAttributes)
.collect(Collectors.toList());
assertEquals(1, timerStartedEvents.size());
HistoryUtils.assertEventMetadata(timerStartedEvents.get(0), SLEEP_SUMMARY, null);

if (testWorkflowRule.isUseExternalService()) {
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
"interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP,
"registerQuery getTrace",
"newThread workflow-method",
"currentTimeMillis",
"sleep PT0.7S",
"currentTimeMillis");
} else {
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
"interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP,
"registerQuery getTrace",
"newThread workflow-method",
"currentTimeMillis",
"sleep PT11M40S",
"currentTimeMillis");
}
}

public static class TestSleepWorkflowImpl implements TestTraceWorkflow {

@Override
public String execute() {
boolean useExternalService = SDKTestWorkflowRule.useExternalService;
Duration timeout1 = useExternalService ? Duration.ofMillis(700) : Duration.ofSeconds(700);
long time = Workflow.currentTimeMillis();
Workflow.sleep(timeout1, TimerOptions.newBuilder().setSummary(SLEEP_SUMMARY).build());
long slept = Workflow.currentTimeMillis() - time;
// Also checks that rounding up to a second works.
assertTrue(slept + "<" + timeout1.toMillis(), slept >= timeout1.toMillis());
return "testSleep";
}

@Override
public List<String> getTrace() {
throw new UnsupportedOperationException("not implemented");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ public void sleep(Duration duration) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void sleep(Duration duration, TimerOptions options) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ public void sleep(Duration duration) {
next.sleep(duration);
}

@Override
public void sleep(Duration duration, TimerOptions options) {
if (!WorkflowUnsafe.isReplaying()) {
trace.add("sleep " + duration);
}
next.sleep(duration, options);
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down
Loading