diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java new file mode 100644 index 0000000000..5cd24018f9 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusStartWorkflowHelper.java @@ -0,0 +1,81 @@ +package io.temporal.internal.nexus; + +import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; +import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; + +import io.nexusrpc.handler.HandlerException; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.api.common.v1.Link; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.internal.client.NexusStartWorkflowRequest; +import io.temporal.internal.client.NexusStartWorkflowResponse; +import java.net.URISyntaxException; +import java.util.function.Function; + +/** + * Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the + * operation context. Used by both {@code WorkflowRunOperationImpl} and {@code + * TemporalNexusClientImpl}. + */ +public class NexusStartWorkflowHelper { + + /** + * Starts a workflow via the provided invoker function, attaches workflow links to the operation + * context, and returns the response. + * + * @param ctx the operation context (links will be attached as a side-effect) + * @param details the operation start details containing requestId, callback, links + * @param invoker function that starts the workflow given a {@link NexusStartWorkflowRequest} + * @return the {@link NexusStartWorkflowResponse} containing the operation token and workflow + * execution + */ + public static NexusStartWorkflowResponse startWorkflowAndAttachLinks( + OperationContext ctx, + OperationStartDetails details, + Function invoker) { + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); + + NexusStartWorkflowRequest nexusRequest = + new NexusStartWorkflowRequest( + details.getRequestId(), + details.getCallbackUrl(), + details.getCallbackHeaders(), + nexusCtx.getTaskQueue(), + details.getLinks()); + + NexusStartWorkflowResponse response = invoker.apply(nexusRequest); + WorkflowExecution workflowExec = response.getWorkflowExecution(); + + // If the start workflow response returned a link use it, otherwise + // create the link information about the new workflow and return to the caller. + Link.WorkflowEvent workflowEventLink = + nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent() + ? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent() + : null; + if (workflowEventLink == null) { + workflowEventLink = + Link.WorkflowEvent.newBuilder() + .setNamespace(nexusCtx.getNamespace()) + .setWorkflowId(workflowExec.getWorkflowId()) + .setRunId(workflowExec.getRunId()) + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + } + io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); + if (nexusLink != null) { + try { + ctx.addLinks(nexusProtoLinkToLink(nexusLink)); + } catch (URISyntaxException e) { + throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e); + } + } + + return response; + } + + private NexusStartWorkflowHelper() {} +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java similarity index 82% rename from temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java rename to temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java index 2c8d1acb87..4bd5635e93 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java @@ -3,7 +3,8 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -public class WorkflowRunOperationToken { +/** Deserialized representation of a Nexus operation token. */ +public class OperationToken { @JsonProperty("v") @JsonInclude(JsonInclude.Include.NON_NULL) private final Integer version; @@ -17,7 +18,7 @@ public class WorkflowRunOperationToken { @JsonProperty("wid") private final String workflowId; - public WorkflowRunOperationToken( + public OperationToken( @JsonProperty("t") Integer type, @JsonProperty("ns") String namespace, @JsonProperty("wid") String workflowId, @@ -28,8 +29,8 @@ public WorkflowRunOperationToken( this.version = version; } - public WorkflowRunOperationToken(String namespace, String workflowId) { - this.type = OperationTokenType.WORKFLOW_RUN; + public OperationToken(OperationTokenType type, String namespace, String workflowId) { + this.type = type; this.namespace = namespace; this.workflowId = workflowId; this.version = null; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java index 1f4869bdc4..737a84aad4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java @@ -15,33 +15,45 @@ public class OperationTokenUtil { private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding(); /** - * Load a workflow run operation token from an operation token. + * Load and validate an operation token without asserting the token type. Use this for cancel + * dispatch where the token type determines the cancel behavior. * - * @throws IllegalArgumentException if the operation token is invalid + * @throws IllegalArgumentException if the operation token is malformed or has invalid structure */ - public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) { - WorkflowRunOperationToken token; + public static OperationToken loadOperationToken(String operationToken) { + OperationToken token; try { - JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class); + JavaType reference = mapper.getTypeFactory().constructType(OperationToken.class); token = mapper.readValue(decoder.decode(operationToken), reference); } catch (Exception e) { throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage()); } - if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) { - throw new IllegalArgumentException( - "Invalid workflow run token: incorrect operation token type: " + token.getType()); - } if (token.getVersion() != null && token.getVersion() != 0) { - throw new IllegalArgumentException("Invalid workflow run token: unexpected version field"); + throw new IllegalArgumentException("Invalid operation token: unexpected version field"); } if (Strings.isNullOrEmpty(token.getWorkflowId())) { - throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)"); + throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)"); + } + return token; + } + + /** + * Load a workflow run operation token, asserting that the token type is {@link + * OperationTokenType#WORKFLOW_RUN}. + * + * @throws IllegalArgumentException if the operation token is invalid or not a workflow run token + */ + public static OperationToken loadWorkflowRunOperationToken(String operationToken) { + OperationToken token = loadOperationToken(operationToken); + if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) { + throw new IllegalArgumentException( + "Invalid workflow run token: incorrect operation token type: " + token.getType()); } return token; } /** - * Attempt to extract the workflow Id from an operation token. + * Extract the workflow ID from a workflow run operation token. * * @throws IllegalArgumentException if the operation token is invalid */ @@ -52,7 +64,9 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) { /** Generate a workflow run operation token from a workflow ID and namespace. */ public static String generateWorkflowRunOperationToken(String workflowId, String namespace) throws JsonProcessingException { - String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId)); + String json = + ow.writeValueAsString( + new OperationToken(OperationTokenType.WORKFLOW_RUN, namespace, workflowId)); return encoder.encodeToString(json.getBytes()); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java new file mode 100644 index 0000000000..627e769033 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClient.java @@ -0,0 +1,401 @@ +package io.temporal.nexus; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.Experimental; +import io.temporal.workflow.Functions; + +/** + * Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with + * Temporal from within a Nexus operation handler. + * + *

Obtained via the {@link TemporalOperationHandler.StartHandler} parameter. + * + *

Example usage to start a workflow from an operation handler: + * + *

{@code
+ * @OperationImpl
+ * public OperationHandler startTransfer() {
+ *   return TemporalOperationHandler.create((context, client, input) -> {
+ *     return client.startWorkflow(
+ *         TransferWorkflow.class,
+ *         TransferWorkflow::transfer, input.getFromAccount(), input.getToAccount(),
+ *         WorkflowOptions.newBuilder()
+ *             .setWorkflowId("transfer-" + input.getTransferId())
+ *             .build());
+ *   });
+ * }
+ * }
+ * + *

For synchronous operations, use {@link #getWorkflowClient()} directly and return a {@link + * TemporalOperationResult#sync} result. For example, to send a signal: + * + *

{@code
+ * @OperationImpl
+ * public OperationHandler cancelOrder() {
+ *   return TemporalOperationHandler.create((context, client, input) -> {
+ *     client.getWorkflowClient()
+ *         .newUntypedWorkflowStub("order-" + input.getOrderId())
+ *         .signal("requestCancellation", input);
+ *     return TemporalOperationResult.sync(null);
+ *   });
+ * }
+ * }
+ */ +@Experimental +public interface TemporalNexusClient { + + /** Returns the underlying {@link WorkflowClient} for advanced use cases. */ + WorkflowClient getWorkflowClient(); + + /** + * Starts a zero-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Func1 workflowMethod, WorkflowOptions options); + + /** + * Starts a one-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::processOrder, input, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func2 workflowMethod, + A1 arg1, + WorkflowOptions options); + + /** + * Starts a two-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options); + + /** + * Starts a three-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, arg3, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options); + + /** + * Starts a four-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, arg3, arg4, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options); + + /** + * Starts a five-argument workflow that returns a value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::run, arg1, arg2, arg3, arg4, arg5, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param arg5 fifth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @param the type of the fifth workflow argument + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options); + + /** + * Starts a zero-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Proc1 workflowMethod, WorkflowOptions options); + + /** + * Starts a one-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, input, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc2 workflowMethod, + A1 arg1, + WorkflowOptions options); + + /** + * Starts a two-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options); + + /** + * Starts a three-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, arg3, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options); + + /** + * Starts a four-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, arg3, arg4, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options); + + /** + * Starts a five-argument workflow with no return value. + * + *

Example: + * + *

{@code
+   * client.startWorkflow(MyWorkflow.class, MyWorkflow::execute, arg1, arg2, arg3, arg4, arg5, options)
+   * }
+ * + * @param workflowClass the workflow interface class + * @param workflowMethod unbound method reference to the workflow method + * @param arg1 first workflow argument + * @param arg2 second workflow argument + * @param arg3 third workflow argument + * @param arg4 fourth workflow argument + * @param arg5 fifth workflow argument + * @param options workflow start options (must include workflowId) + * @param the workflow interface type + * @param the type of the first workflow argument + * @param the type of the second workflow argument + * @param the type of the third workflow argument + * @param the type of the fourth workflow argument + * @param the type of the fifth workflow argument + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options); + + /** + * Starts a workflow using an untyped workflow type name. + * + *

Example: + * + *

{@code
+   * client.startWorkflow("MyWorkflow", String.class, options, input)
+   * }
+ * + * @param workflowType the workflow type name string + * @param resultClass the expected result class + * @param options workflow start options (must include workflowId) + * @param args workflow arguments + * @param the workflow return type + * @return an async {@link TemporalOperationResult} with the workflow-run operation token + */ + TemporalOperationResult startWorkflow( + String workflowType, Class resultClass, WorkflowOptions options, Object... args); +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java new file mode 100644 index 0000000000..c65df759d5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java @@ -0,0 +1,218 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.HandlerException; +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.Experimental; +import io.temporal.internal.client.NexusStartWorkflowResponse; +import io.temporal.internal.nexus.NexusStartWorkflowHelper; +import io.temporal.workflow.Functions; +import java.util.Objects; + +/** Package-private implementation of {@link TemporalNexusClient}. */ +@Experimental +final class TemporalNexusClientImpl implements TemporalNexusClient { + + private final WorkflowClient client; + private final OperationContext operationContext; + private final OperationStartDetails operationStartDetails; + private boolean asyncOperationStarted; + + TemporalNexusClientImpl( + WorkflowClient client, + OperationContext operationContext, + OperationStartDetails operationStartDetails) { + this.client = Objects.requireNonNull(client); + this.operationContext = Objects.requireNonNull(operationContext); + this.operationStartDetails = Objects.requireNonNull(operationStartDetails); + } + + @Override + public WorkflowClient getWorkflowClient() { + return client; + } + + // ---------- Returning (Func) overloads ---------- + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Func1 workflowMethod, WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func2 workflowMethod, + A1 arg1, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2, arg3))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Func6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4, arg5))); + } + + // ---------- Void (Proc) overloads ---------- + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, Functions.Proc1 workflowMethod, WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc2 workflowMethod, + A1 arg1, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc3 workflowMethod, + A1 arg1, + A2 arg2, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc4 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod(() -> workflowMethod.apply(stub, arg1, arg2, arg3))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc5 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4))); + } + + @Override + public TemporalOperationResult startWorkflow( + Class workflowClass, + Functions.Proc6 workflowMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + WorkflowOptions options) { + T stub = client.newWorkflowStub(workflowClass, options); + return invokeAndReturn( + WorkflowHandle.fromWorkflowMethod( + () -> workflowMethod.apply(stub, arg1, arg2, arg3, arg4, arg5))); + } + + // ---------- Untyped ---------- + + @Override + public TemporalOperationResult startWorkflow( + String workflowType, Class resultClass, WorkflowOptions options, Object... args) { + WorkflowStub stub = client.newUntypedWorkflowStub(workflowType, options); + WorkflowHandle handle = WorkflowHandle.fromWorkflowStub(stub, resultClass, args); + return invokeAndReturn(handle); + } + + private TemporalOperationResult invokeAndReturn(WorkflowHandle handle) { + if (asyncOperationStarted) { + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new IllegalStateException( + "Only one async operation can be started per operation handler invocation. " + + "Use getWorkflowClient() for additional workflow interactions.")); + } + NexusStartWorkflowResponse response = + NexusStartWorkflowHelper.startWorkflowAndAttachLinks( + operationContext, + operationStartDetails, + request -> handle.getInvoker().invoke(request)); + // Set after successful start so that if startWorkflowAndAttachLinks throws, + // the handler can retry without being blocked by the guard. + asyncOperationStarted = true; + return TemporalOperationResult.async(response.getOperationToken()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java new file mode 100644 index 0000000000..afee2a8a8e --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationCancelContext.java @@ -0,0 +1,48 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationCancelDetails; +import io.nexusrpc.handler.OperationContext; +import io.temporal.common.Experimental; +import java.util.Objects; + +/** + * Context for a Nexus cancel operation request, passed to {@link + * TemporalOperationHandler#cancelWorkflowRun}. + */ +@Experimental +public final class TemporalOperationCancelContext { + + private final OperationContext operationContext; + private final OperationCancelDetails operationCancelDetails; + + TemporalOperationCancelContext( + OperationContext operationContext, OperationCancelDetails operationCancelDetails) { + this.operationContext = Objects.requireNonNull(operationContext); + this.operationCancelDetails = Objects.requireNonNull(operationCancelDetails); + } + + /** Returns the service name for this operation. */ + public String getService() { + return operationContext.getService(); + } + + /** Returns the operation name. */ + public String getOperation() { + return operationContext.getOperation(); + } + + /** Returns the operation token identifying the operation to cancel. */ + public String getOperationToken() { + return operationCancelDetails.getOperationToken(); + } + + /** Returns the underlying {@link OperationContext} for advanced use cases. */ + public OperationContext getOperationContext() { + return operationContext; + } + + /** Returns the underlying {@link OperationCancelDetails} for advanced use cases. */ + public OperationCancelDetails getOperationCancelDetails() { + return operationCancelDetails; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java new file mode 100644 index 0000000000..b4b484cb00 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java @@ -0,0 +1,126 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.*; +import io.nexusrpc.handler.OperationHandler; +import io.temporal.client.WorkflowClient; +import io.temporal.common.Experimental; +import io.temporal.internal.nexus.CurrentNexusOperationContext; +import io.temporal.internal.nexus.InternalNexusOperationContext; +import io.temporal.internal.nexus.OperationToken; +import io.temporal.internal.nexus.OperationTokenType; +import io.temporal.internal.nexus.OperationTokenUtil; + +/** + * Generic Nexus operation handler backed by Temporal. Implements {@link OperationHandler} and + * provides a composable way to map Temporal operations (start workflow, etc.) to Nexus operations. + * + *

Usage example: + * + *

{@code
+ * @OperationImpl
+ * public OperationHandler startTransfer() {
+ *   return TemporalOperationHandler.create((context, client, input) -> {
+ *     return client.startWorkflow(
+ *         TransferWorkflow.class,
+ *         TransferWorkflow::transfer, input.getFromAccount(), input.getToAccount(),
+ *         WorkflowOptions.newBuilder()
+ *             .setWorkflowId("transfer-" + input.getTransferId())
+ *             .build());
+ *   });
+ * }
+ * }
+ * + *

This class supports subclassing to customize cancel behavior. Override {@link + * #cancelWorkflowRun} to change how workflow-run cancellations are handled. The {@link #start} and + * {@link #cancel} methods should not be overridden — they contain the core dispatch logic. + * + * @param the input type + * @param the result type + */ +@Experimental +public class TemporalOperationHandler implements OperationHandler { + + /** + * Handler invoked when a Nexus start operation request is received. + * + * @param the input type + * @param the result type + */ + @FunctionalInterface + public interface StartHandler { + TemporalOperationResult apply( + TemporalOperationStartContext context, TemporalNexusClient client, T input); + } + + private final StartHandler startHandler; + + protected TemporalOperationHandler(StartHandler startHandler) { + this.startHandler = startHandler; + } + + /** + * Creates a {@link TemporalOperationHandler} from a start handler. Subclass and override {@link + * #cancelWorkflowRun} to customize cancel behavior. + * + * @param startHandler the handler to invoke on start operation requests + * @return an operation handler backed by the given start handler + */ + public static TemporalOperationHandler create(StartHandler startHandler) { + return new TemporalOperationHandler<>(startHandler); + } + + @Override + public final OperationStartResult start( + OperationContext ctx, OperationStartDetails details, T input) { + InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); + TemporalNexusClient client = + new TemporalNexusClientImpl(nexusCtx.getWorkflowClient(), ctx, details); + + TemporalOperationStartContext startContext = new TemporalOperationStartContext(ctx, details); + TemporalOperationResult result = startHandler.apply(startContext, client, input); + + if (result.isSync()) { + return OperationStartResult.newSyncBuilder(result.getSyncResult()).build(); + } else if (result.isAsync()) { + return OperationStartResult.newAsyncBuilder(result.getAsyncOperationToken()).build(); + } else { + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + new IllegalStateException("TemporalOperationResult must be either sync or async")); + } + } + + @Override + public final void cancel(OperationContext ctx, OperationCancelDetails details) { + OperationToken token; + try { + token = OperationTokenUtil.loadOperationToken(details.getOperationToken()); + } catch (IllegalArgumentException e) { + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, "failed to parse operation token", e); + } + + TemporalOperationCancelContext cancelContext = new TemporalOperationCancelContext(ctx, details); + if (token.getType() == OperationTokenType.WORKFLOW_RUN) { + cancelWorkflowRun(cancelContext, token.getWorkflowId()); + } else { + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new IllegalArgumentException("unsupported operation token type: " + token.getType())); + } + } + + /** + * Called when a cancel request is received for a workflow-run token (type=1). Override to + * customize cancel behavior. + * + *

Default behavior: cancels the underlying workflow. + * + * @param context the cancel context + * @param workflowId the workflow ID extracted from the operation token + */ + protected void cancelWorkflowRun(TemporalOperationCancelContext context, String workflowId) { + WorkflowClient client = CurrentNexusOperationContext.get().getWorkflowClient(); + client.newUntypedWorkflowStub(workflowId).cancel(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java new file mode 100644 index 0000000000..3ff3fa2e60 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationResult.java @@ -0,0 +1,82 @@ +package io.temporal.nexus; + +import com.google.common.base.Strings; +import io.temporal.common.Experimental; +import javax.annotation.Nullable; + +/** + * Unified result type for Temporal-backed Nexus operations. Encapsulates either a synchronous + * result or an async operation token. + * + *

Use {@link #sync(Object)} for operations that complete immediately (e.g., signals). Use {@link + * #async(String)} for operations that return an operation token for async completion (e.g., start + * workflow). + */ +@Experimental +public final class TemporalOperationResult { + + private final boolean isSync; + @Nullable private final R syncResult; + @Nullable private final String asyncOperationToken; + + private TemporalOperationResult( + boolean isSync, @Nullable R syncResult, @Nullable String asyncOperationToken) { + this.isSync = isSync; + this.syncResult = syncResult; + this.asyncOperationToken = asyncOperationToken; + } + + /** + * Creates a synchronous result. + * + * @param value the result value, may be null + * @return a sync result wrapping the given value + */ + public static TemporalOperationResult sync(@Nullable R value) { + return new TemporalOperationResult<>(true, value, null); + } + + /** + * Creates an asynchronous result backed by an operation token. + * + * @param operationToken the operation token identifying the async operation + * @return an async result wrapping the given token + * @throws IllegalArgumentException if operationToken is null or empty + */ + public static TemporalOperationResult async(String operationToken) { + if (Strings.isNullOrEmpty(operationToken)) { + throw new IllegalArgumentException("operationToken must not be null or empty"); + } + return new TemporalOperationResult<>(false, null, operationToken); + } + + /** Returns true if this is a synchronous result. */ + public boolean isSync() { + return isSync; + } + + /** Returns true if this is an asynchronous result backed by an operation token. */ + public boolean isAsync() { + return !isSync; + } + + /** + * Returns the synchronous result value, or null if this is an async result. + * + * @return the sync result value, or null + */ + @Nullable + public R getSyncResult() { + return syncResult; + } + + /** + * Returns the async operation token, or null if this is a sync result. + * + * @return the operation token, or null + */ + @Nullable + public String getAsyncOperationToken() { + return asyncOperationToken; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java new file mode 100644 index 0000000000..814cf75068 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationStartContext.java @@ -0,0 +1,48 @@ +package io.temporal.nexus; + +import io.nexusrpc.handler.OperationContext; +import io.nexusrpc.handler.OperationStartDetails; +import io.temporal.common.Experimental; +import java.util.Objects; + +/** + * Context for a Nexus start operation request, passed to {@link + * TemporalOperationHandler.StartHandler} alongside the {@link TemporalNexusClient} and input. + */ +@Experimental +public final class TemporalOperationStartContext { + + private final OperationContext operationContext; + private final OperationStartDetails operationStartDetails; + + TemporalOperationStartContext( + OperationContext operationContext, OperationStartDetails operationStartDetails) { + this.operationContext = Objects.requireNonNull(operationContext); + this.operationStartDetails = Objects.requireNonNull(operationStartDetails); + } + + /** Returns the service name for this operation. */ + public String getService() { + return operationContext.getService(); + } + + /** Returns the operation name. */ + public String getOperation() { + return operationContext.getOperation(); + } + + /** Returns the request ID for this operation. */ + public String getRequestId() { + return operationStartDetails.getRequestId(); + } + + /** Returns the underlying {@link OperationContext} for advanced use cases. */ + public OperationContext getOperationContext() { + return operationContext; + } + + /** Returns the underlying {@link OperationStartDetails} for advanced use cases. */ + public OperationStartDetails getOperationStartDetails() { + return operationStartDetails; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java index 537235ce53..167cd4061f 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java @@ -1,20 +1,12 @@ package io.temporal.nexus; -import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; -import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; - import io.nexusrpc.handler.*; import io.nexusrpc.handler.OperationHandler; -import io.temporal.api.common.v1.Link; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.enums.v1.EventType; import io.temporal.client.WorkflowClient; -import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.client.NexusStartWorkflowResponse; import io.temporal.internal.nexus.CurrentNexusOperationContext; -import io.temporal.internal.nexus.InternalNexusOperationContext; +import io.temporal.internal.nexus.NexusStartWorkflowHelper; import io.temporal.internal.nexus.OperationTokenUtil; -import java.net.URISyntaxException; class WorkflowRunOperationImpl implements OperationHandler { private final WorkflowHandleFactory handleFactory; @@ -26,52 +18,13 @@ class WorkflowRunOperationImpl implements OperationHandler { @Override public OperationStartResult start( OperationContext ctx, OperationStartDetails operationStartDetails, T input) { - InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get(); - WorkflowHandle handle = handleFactory.apply(ctx, operationStartDetails, input); - NexusStartWorkflowRequest nexusRequest = - new NexusStartWorkflowRequest( - operationStartDetails.getRequestId(), - operationStartDetails.getCallbackUrl(), - operationStartDetails.getCallbackHeaders(), - nexusCtx.getTaskQueue(), - operationStartDetails.getLinks()); - - NexusStartWorkflowResponse nexusStartWorkflowResponse = - handle.getInvoker().invoke(nexusRequest); - WorkflowExecution workflowExec = nexusStartWorkflowResponse.getWorkflowExecution(); + NexusStartWorkflowResponse response = + NexusStartWorkflowHelper.startWorkflowAndAttachLinks( + ctx, operationStartDetails, request -> handle.getInvoker().invoke(request)); - // If the start workflow response returned a link use it, otherwise - // create the link information about the new workflow and return to the caller. - Link.WorkflowEvent workflowEventLink = - nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent() - ? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent() - : null; - if (workflowEventLink == null) { - workflowEventLink = - Link.WorkflowEvent.newBuilder() - .setNamespace(nexusCtx.getNamespace()) - .setWorkflowId(workflowExec.getWorkflowId()) - .setRunId(workflowExec.getRunId()) - .setEventRef( - Link.WorkflowEvent.EventReference.newBuilder() - .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) - .build(); - } - io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); - // Attach the link to the operation result. - OperationStartResult.Builder result = - OperationStartResult.newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken()); - if (nexusLink != null) { - try { - ctx.addLinks(nexusProtoLinkToLink(nexusLink)); - } catch (URISyntaxException e) { - // Not expected as the link is constructed by the SDK. - throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e); - } - } - return result.build(); + return OperationStartResult.newAsyncBuilder(response.getOperationToken()).build(); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java index fbf14d217a..1f22fe8c2e 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java @@ -17,7 +17,8 @@ public class WorkflowRunTokenTest { @Test public void serializeWorkflowRunToken() throws JsonProcessingException { - WorkflowRunOperationToken token = new WorkflowRunOperationToken("namespace", "workflowId"); + OperationToken token = + new OperationToken(OperationTokenType.WORKFLOW_RUN, "namespace", "workflowId"); String json = ow.writeValueAsString(token); final JsonNode node = new ObjectMapper().readTree(json); System.out.println(json); @@ -32,9 +33,8 @@ public void serializeWorkflowRunToken() throws JsonProcessingException { @Test public void deserializeWorkflowRunTokenWithVersion() throws IOException { String json = "{\"t\":1,\"ns\":\"namespace\",\"wid\":\"workflowId\",\"v\":1}"; - JavaType reference = - new ObjectMapper().getTypeFactory().constructType(WorkflowRunOperationToken.class); - WorkflowRunOperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); + JavaType reference = new ObjectMapper().getTypeFactory().constructType(OperationToken.class); + OperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); // Assert that the serialized JSON is as expected Assert.assertEquals(OperationTokenType.WORKFLOW_RUN, token.getType()); Assert.assertEquals(new Integer(1), token.getVersion()); @@ -45,9 +45,8 @@ public void deserializeWorkflowRunTokenWithVersion() throws IOException { @Test public void deserializeWorkflowRunToken() throws IOException { String json = "{\"t\":1,\"ns\":\"namespace\",\"wid\":\"workflowId\"}"; - JavaType reference = - new ObjectMapper().getTypeFactory().constructType(WorkflowRunOperationToken.class); - WorkflowRunOperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); + JavaType reference = new ObjectMapper().getTypeFactory().constructType(OperationToken.class); + OperationToken token = new ObjectMapper().readValue(json.getBytes(), reference); // Assert that the serialized JSON is as expected Assert.assertEquals(OperationTokenType.WORKFLOW_RUN, token.getType()); Assert.assertNull(null, token.getVersion()); @@ -67,8 +66,8 @@ public void failLoadOldWorkflowRunToken() { public void loadWorkflowIdFromOperationToken() { String json = "{\"t\":1,\"ns\":\"namespace\",\"wid\":\"workflowId\"}"; - WorkflowRunOperationToken token = - OperationTokenUtil.loadWorkflowRunOperationToken(encoder.encodeToString(json.getBytes())); + OperationToken token = + OperationTokenUtil.loadOperationToken(encoder.encodeToString(json.getBytes())); Assert.assertEquals("workflowId", token.getWorkflowId()); Assert.assertEquals("namespace", token.getNamespace()); Assert.assertNull(token.getVersion()); @@ -86,8 +85,7 @@ public void loadWorkflowIdFromGoOperationToken() { // across SDKs. String goOperationToken = "eyJ2IjowLCJ0IjoxLCJucyI6Im5zIiwid2lkIjoidyJ9"; - WorkflowRunOperationToken token = - OperationTokenUtil.loadWorkflowRunOperationToken(goOperationToken); + OperationToken token = OperationTokenUtil.loadOperationToken(goOperationToken); Assert.assertEquals("w", token.getWorkflowId()); Assert.assertEquals("ns", token.getNamespace()); Assert.assertEquals(Integer.valueOf(0), token.getVersion()); @@ -101,7 +99,7 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenEmptyJson.getBytes()))); // Bad token, missing the "wid" field @@ -109,7 +107,7 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenMissingWorkflow.getBytes()))); // Bad token, unknown version @@ -118,15 +116,23 @@ public void loadWorkflowIdFromBadOperationToken() { Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenUnknownVersion.getBytes()))); - // Bad token, unknown version + // Bad token, unknown type (also has bad version, so loadOperationToken rejects on version) String badTokenUnknownType = "{\"t\":4,\"ns\":\"namespace\", \"wid\":\"workflowId\", \"v\":1}"; Assert.assertThrows( IllegalArgumentException.class, () -> - OperationTokenUtil.loadWorkflowRunOperationToken( + OperationTokenUtil.loadOperationToken( encoder.encodeToString(badTokenUnknownType.getBytes()))); + + // Bad token, unknown type with valid version — loadWorkflowRunOperationToken rejects on type + String badTokenWrongType = "{\"t\":4,\"ns\":\"namespace\", \"wid\":\"workflowId\"}"; + Assert.assertThrows( + IllegalArgumentException.class, + () -> + OperationTokenUtil.loadWorkflowRunOperationToken( + encoder.encodeToString(badTokenWrongType.getBytes()))); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java index a0b5f1f9d9..3259c428ea 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java @@ -6,8 +6,8 @@ import io.temporal.client.WorkflowOptions; import io.temporal.failure.ApplicationFailure; import io.temporal.failure.NexusOperationFailure; +import io.temporal.internal.nexus.OperationToken; import io.temporal.internal.nexus.OperationTokenUtil; -import io.temporal.internal.nexus.WorkflowRunOperationToken; import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowRunOperation; import io.temporal.testing.WorkflowReplayer; @@ -74,7 +74,7 @@ public String execute(String input) { "Operation token should be present", asyncExec.getOperationToken().isPresent()); // Result should only be completed if the operation is completed Assert.assertFalse("Result should not be completed", asyncOpHandle.getResult().isCompleted()); - WorkflowRunOperationToken token = + OperationToken token = OperationTokenUtil.loadWorkflowRunOperationToken(asyncExec.getOperationToken().get()); Assert.assertTrue(token.getWorkflowId().startsWith(WORKFLOW_ID_PREFIX)); // Unblock the operation diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java new file mode 100644 index 0000000000..4773e6c521 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerCancelTest.java @@ -0,0 +1,137 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.enums.v1.EventType; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.failure.CanceledFailure; +import io.temporal.internal.Signal; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerCancelTest extends BaseNexusTest { + + private static final Signal opStarted = new Signal(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestCancelWorkflow.class, WaitForCancelWorkflow.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Override + protected SDKTestWorkflowRule getTestWorkflowRule() { + return testWorkflowRule; + } + + @Override + public void setUp() { + super.setUp(); + opStarted.clearSignal(); + } + + @Test + public void cancelGenericHandlerOperation() { + WorkflowStub stub = testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("TestCancelWorkflow"); + stub.start(false); + try { + opStarted.waitForSignal(); + } catch (Exception e) { + Assert.fail("test timed out waiting for operation to start."); + } + stub.cancel(); + Assert.assertThrows(WorkflowFailedException.class, () -> stub.getResult(Void.class)); + // Verify the nexus operation cancel was dispatched and completed successfully. + // The parent workflow completes as cancelled (CanceledFailure), same as with + // WorkflowRunOperation — the cancel handler correctly cancels the handler workflow. + testWorkflowRule.assertHistoryEvent( + stub.getExecution().getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED); + } + + @WorkflowInterface + public interface TestCancelWorkflowInterface { + @WorkflowMethod(name = "TestCancelWorkflow") + void execute(boolean cancelImmediately); + } + + public static class TestCancelWorkflow implements TestCancelWorkflowInterface { + @Override + public void execute(boolean cancelImmediately) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .setCancellationType(NexusOperationCancellationType.WAIT_COMPLETED) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(getEndpointName()) + .setOperationOptions(options) + .build(); + + TestNexusCancelService serviceStub = + Workflow.newNexusServiceStub(TestNexusCancelService.class, serviceOptions); + + NexusOperationHandle handle = + Workflow.startNexusOperation(serviceStub::operation, "cancel-test"); + handle.getExecution().get(); + opStarted.signal(); + + try { + Workflow.await(() -> false); + } catch (CanceledFailure f) { + Workflow.newDetachedCancellationScope(() -> handle.getResult().get()).run(); + } + } + } + + @WorkflowInterface + public interface WaitForCancelWorkflowInterface { + @WorkflowMethod + Void execute(String input); + } + + public static class WaitForCancelWorkflow implements WaitForCancelWorkflowInterface { + @Override + public Void execute(String input) { + try { + Workflow.await(() -> false); + } catch (CanceledFailure f) { + // workflow was cancelled as expected + } + return null; + } + } + + @Service + public interface TestNexusCancelService { + @Operation + Void operation(String input); + } + + @ServiceImpl(service = TestNexusCancelService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> + client.startWorkflow( + WaitForCancelWorkflowInterface.class, + WaitForCancelWorkflowInterface::execute, + input, + WorkflowOptions.newBuilder() + .setWorkflowId("generic-cancel-test-" + context.getService()) + .build())); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerDoubleStartTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerDoubleStartTest.java new file mode 100644 index 0000000000..cb1e9bc789 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerDoubleStartTest.java @@ -0,0 +1,106 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.HandlerException; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.NexusOperationFailure; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerDoubleStartTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + private static final String EXPECTED_MESSAGE = + "Only one async operation can be started per operation handler " + + "invocation. Use getWorkflowClient() for additional workflow interactions."; + + @Test + public void doubleStartThrows() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + + WorkflowFailedException e = + Assert.assertThrows( + WorkflowFailedException.class, + () -> workflowStub.execute(testWorkflowRule.getTaskQueue())); + + Assert.assertTrue(e.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusFailure = (NexusOperationFailure) e.getCause(); + + Assert.assertTrue(nexusFailure.getCause() instanceof HandlerException); + HandlerException handlerException = (HandlerException) nexusFailure.getCause(); + Assert.assertEquals("handler error: " + EXPECTED_MESSAGE, handlerException.getMessage()); + + Assert.assertTrue(handlerException.getCause() instanceof ApplicationFailure); + ApplicationFailure appFailure = (ApplicationFailure) handlerException.getCause(); + Assert.assertEquals("java.lang.IllegalStateException", appFailure.getType()); + Assert.assertEquals(EXPECTED_MESSAGE, appFailure.getOriginalMessage()); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceDoubleStart serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceDoubleStart.class, serviceOptions); + return serviceStub.operation("input"); + } + } + + @Service + public interface TestNexusServiceDoubleStart { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusServiceDoubleStart.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> { + // First start should succeed + client.startWorkflow( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc::func1, + input, + WorkflowOptions.newBuilder() + .setWorkflowId("double-start-first-" + context.getService()) + .build()); + // Second start should throw + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc::func1, + input, + WorkflowOptions.newBuilder() + .setWorkflowId("double-start-second-" + context.getService()) + .build()); + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java new file mode 100644 index 0000000000..01eda16b01 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerSyncResultTest.java @@ -0,0 +1,64 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.nexus.TemporalOperationResult; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerSyncResultTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void syncResultTest() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("sync-hello", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusSyncService serviceStub = + Workflow.newNexusServiceStub(TestNexusSyncService.class, serviceOptions); + return serviceStub.operation("hello"); + } + } + + @Service + public interface TestNexusSyncService { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusSyncService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> TemporalOperationResult.sync("sync-" + input)); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java new file mode 100644 index 0000000000..d85e12f5d1 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedProcTest.java @@ -0,0 +1,123 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerTypedProcTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void typedProcStartWorkflowTest() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("done", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceProc serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceProc.class, serviceOptions); + for (int i = 0; i < 6; i++) { + serviceStub.operation(i); + } + return "done"; + } + } + + @Service + public interface TestNexusServiceProc { + @Operation + Void operation(Integer input); + } + + @ServiceImpl(service = TestNexusServiceProc.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> { + String prefix = "generic-handler-test-proc" + input + "-"; + String workflowId = prefix + context.getService() + "-" + context.getOperation(); + WorkflowOptions options = + WorkflowOptions.newBuilder().setWorkflowId(workflowId).build(); + switch (input) { + case 0: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc::proc, + options); + case 1: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc::proc1, + "input", + options); + case 2: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc::proc2, + "input", + 2, + options); + case 3: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc::proc3, + "input", + 2, + 3, + options); + case 4: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc::proc4, + "input", + 2, + 3, + 4, + options); + case 5: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc.class, + TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc::proc5, + "input", + 2, + 3, + 4, + 5, + options); + default: + throw new IllegalArgumentException("unexpected input: " + input); + } + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java new file mode 100644 index 0000000000..f9c12b0e64 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerTypedStartWorkflowTest.java @@ -0,0 +1,124 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerTypedStartWorkflowTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void typedStartWorkflowTests() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("funcinputinput2input23input234input2345", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceGeneric serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceGeneric.class, serviceOptions); + StringBuilder result = new StringBuilder(); + for (int i = 0; i < 6; i++) { + result.append(serviceStub.operation(i)); + } + return result.toString(); + } + } + + @Service + public interface TestNexusServiceGeneric { + @Operation + String operation(Integer input); + } + + @ServiceImpl(service = TestNexusServiceGeneric.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> { + String prefix = "generic-handler-test-func" + input + "-"; + String workflowId = prefix + context.getService() + "-" + context.getOperation(); + WorkflowOptions options = + WorkflowOptions.newBuilder().setWorkflowId(workflowId).build(); + switch (input) { + case 0: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc.class, + TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc::func, + options); + case 1: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc::func1, + "input", + options); + case 2: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc::func2, + "input", + 2, + options); + case 3: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc::func3, + "input", + 2, + 3, + options); + case 4: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc::func4, + "input", + 2, + 3, + 4, + options); + case 5: + return client.startWorkflow( + TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc.class, + TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc::func5, + "input", + 2, + 3, + 4, + 5, + options); + default: + throw new IllegalArgumentException("unexpected input: " + input); + } + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java new file mode 100644 index 0000000000..1f682d4534 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericHandlerUntypedStartWorkflowTest.java @@ -0,0 +1,77 @@ +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.TemporalOperationHandler; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class GenericHandlerUntypedStartWorkflowTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + TestNexus.class, TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void untypedStartWorkflowTest() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("input", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusOperationOptions options = + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build(); + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder().setOperationOptions(options).build(); + + TestNexusServiceUntyped serviceStub = + Workflow.newNexusServiceStub(TestNexusServiceUntyped.class, serviceOptions); + return serviceStub.operation("input"); + } + } + + @Service + public interface TestNexusServiceUntyped { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusServiceUntyped.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return TemporalOperationHandler.create( + (context, client, input) -> + client.startWorkflow( + "func1", + String.class, + WorkflowOptions.newBuilder() + .setWorkflowId( + "generic-handler-untyped-" + + context.getService() + + "-" + + context.getOperation()) + .build(), + input)); + } + } +}