From 22ac4af04a9626ca45d4870a1a482e94214811f4 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Tue, 13 Jan 2026 15:34:21 +0100 Subject: [PATCH 1/5] wip: tracing Signed-off-by: Javier Aliaga --- durabletask-client/pom.xml | 34 ++++++-- .../durabletask/DurableTaskGrpcClient.java | 45 +++++++++-- .../DurableTaskGrpcClientBuilder.java | 13 ++++ .../durabletask/DurableTaskGrpcWorker.java | 78 +++++++++++++++++-- .../dapr/durabletask/TaskActivityContext.java | 6 ++ .../durabletask/TaskActivityExecutor.java | 16 +++- 6 files changed, 172 insertions(+), 20 deletions(-) diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml index 175cde3e7e..dee9cbcddf 100644 --- a/durabletask-client/pom.xml +++ b/durabletask-client/pom.xml @@ -78,6 +78,18 @@ testcontainers test + + io.micrometer + micrometer-observation + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + @@ -113,24 +125,32 @@ - org.xolstice.maven.plugins + io.github.ascopes protobuf-maven-plugin ${protobuf-maven-plugin.version} - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} - grpc-java - io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} - ${protobuf.input.directory} + ${protobuf.version} + true + + ${protobuf.input.directory} + + + + io.grpc + protoc-gen-grpc-java + ${grpc.version} + + - compile - compile-custom + generate + org.apache.maven.plugins maven-source-plugin diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index b0fa24a5e9..b88cd7f9bf 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -16,6 +16,7 @@ import com.google.protobuf.StringValue; import com.google.protobuf.Timestamp; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TraceContext; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.grpc.Channel; import io.grpc.ChannelCredentials; @@ -28,6 +29,9 @@ import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; import javax.annotation.Nullable; import java.io.FileInputStream; @@ -41,6 +45,8 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.logging.Logger; /** @@ -57,6 +63,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final DataConverter dataConverter; private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; + private final Tracer tracer; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); @@ -130,6 +137,12 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { sidecarGrpcChannel = this.managedSidecarChannel; } + if (builder.tracer != null) { + this.tracer = builder.tracer; + } else { + this.tracer = OpenTelemetry.noop().getTracer("DurableTaskGrpcClient"); + } + this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); } @@ -188,9 +201,28 @@ public String scheduleNewOrchestrationInstance( builder.setScheduledStartTimestamp(ts); } - OrchestratorService.CreateInstanceRequest request = builder.build(); - OrchestratorService.CreateInstanceResponse response = this.sidecarClient.startInstance(request); - return response.getInstanceId(); + AtomicReference response = new AtomicReference<>(); + + this.observe("dapr.workflow.grpc.startInstance", (TraceContext context) -> { + builder.setParentTraceContext(context); + OrchestratorService.CreateInstanceRequest request = builder.build(); + response.set(this.sidecarClient.startInstance(request)); + }); + + return response.get().getInstanceId(); + } + + private void observe(String spanName, Consumer fn) { + Span span = tracer.spanBuilder(spanName).startSpan(); + try { + TraceContext.Builder traceContextBuilder = TraceContext.newBuilder(); + traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId()); + var context = traceContextBuilder.build(); + + fn.accept(context); + } finally { + span.end(); + } } @Override @@ -206,8 +238,11 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) builder.setInput(StringValue.of(serializedPayload)); } - OrchestratorService.RaiseEventRequest request = builder.build(); - this.sidecarClient.raiseEvent(request); + this.observe("dapr.workflow.grpc.raiseEvent", (TraceContext context) -> { + OrchestratorService.RaiseEventRequest request = builder.build(); + this.sidecarClient.raiseEvent(request); + }); + } @Override diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java index f3ba1cd82a..b934fc9713 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClientBuilder.java @@ -14,6 +14,7 @@ package io.dapr.durabletask; import io.grpc.Channel; +import io.opentelemetry.api.trace.Tracer; /** * Builder class for constructing new {@link DurableTaskClient} objects that communicate with a sidecar process @@ -27,6 +28,7 @@ public final class DurableTaskGrpcClientBuilder { String tlsCertPath; String tlsKeyPath; boolean insecure; + Tracer tracer; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -57,6 +59,17 @@ public DurableTaskGrpcClientBuilder grpcChannel(Channel channel) { return this; } + /** + * Sets the Tracer object to be used by DurableTaskClient to emit traces. + * + * @param tracer to be used by the DurableTaskClient + * @return this builder object + */ + public DurableTaskGrpcClientBuilder tracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + /** * Sets the gRPC endpoint port to connect to. If not specified, the default Durable Task port number will be used. * diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index f60d0b9e06..41f1fe686e 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -24,10 +24,14 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import org.apache.commons.lang3.StringUtils; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; import java.time.Duration; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -88,7 +92,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; - this.workerPool = builder.executorService != null ? builder.executorService : Executors.newCachedThreadPool(); + + ExecutorService rawExecutor = builder.executorService != null + ? builder.executorService : Executors.newCachedThreadPool(); + this.workerPool = Context.taskWrapping(rawExecutor); + this.isExecutorServiceManaged = builder.executorService == null; } @@ -218,10 +226,16 @@ public void startAndBlock() { }); } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); - logger.log(Level.FINEST, - String.format("Processing activity request: %s for instance: %s}", + + + logger.log(Level.INFO, + String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", activityRequest.getName(), - activityRequest.getOrchestrationInstance().getInstanceId())); + activityRequest.getOrchestrationInstance().getInstanceId(), + Context.current())); + + // Extract trace context from the ActivityRequest and set it as current + Context traceContext = extractTraceContext(activityRequest); // TODO: Error handling this.workerPool.submit(() -> { @@ -232,7 +246,8 @@ public void startAndBlock() { activityRequest.getName(), activityRequest.getInput().getValue(), activityRequest.getTaskExecutionId(), - activityRequest.getTaskId()); + activityRequest.getTaskId(), + activityRequest.getParentTraceContext().getTraceParent()); } catch (Throwable e) { failureDetails = TaskFailureDetails.newBuilder() .setErrorType(e.getClass().getName()) @@ -342,4 +357,57 @@ private void shutDownWorkerPool() { private String getSidecarAddress() { return this.sidecarClient.getChannel().authority(); } + + /** + * Extracts trace context from the ActivityRequest's ParentTraceContext field + * and creates an OpenTelemetry Context with the parent span set. + * + * @param activityRequest The activity request containing the parent trace context + * @return A Context with the parent span set, or the current context if no trace context is present + */ + private Context extractTraceContext(OrchestratorService.ActivityRequest activityRequest) { + if (!activityRequest.hasParentTraceContext()) { + logger.log(Level.FINE, "No parent trace context in activity request"); + return Context.current(); + } + + OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext(); + String traceParent = traceContext.getTraceParent(); + + if (traceParent.isEmpty()) { + logger.log(Level.FINE, "Empty traceparent in activity request"); + return Context.current(); + } + + logger.log(Level.INFO, + String.format("Extracting trace context from ActivityRequest: traceparent=%s", traceParent)); + + // Use W3CTraceContextPropagator to extract the trace context + Map carrier = new HashMap<>(); + carrier.put("traceparent", traceParent); + if (traceContext.hasTraceState()) { + carrier.put("tracestate", traceContext.getTraceState().getValue()); + } + + TextMapGetter> getter = new TextMapGetter>() { + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) { + return carrier.get(key); + } + }; + + + Context extractedContext = W3CTraceContextPropagator.getInstance() + .extract(Context.current(), carrier, getter); + + logger.log(Level.INFO, + String.format("Extracted trace context: %s", extractedContext)); + + return extractedContext; + } } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index 7a0d1ed1ee..08f5ebb832 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -47,4 +47,10 @@ public interface TaskActivityContext { * @return the task id of the current task activity */ int getTaskId(); + + /** + * Gets the trace parent id for the current workflow execution. + * @return trace parent id + */ + String getTraceParent(); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index a8ef6c67e0..5fecdbef03 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -30,7 +30,8 @@ public TaskActivityExecutor( this.logger = logger; } - public String execute(String taskName, String input, String taskExecutionId, int taskId) throws Throwable { + public String execute(String taskName, String input, + String taskExecutionId, int taskId, String traceParent) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); if (factory == null) { throw new IllegalStateException( @@ -43,7 +44,8 @@ public String execute(String taskName, String input, String taskExecutionId, int String.format("The task factory '%s' returned a null TaskActivity object.", taskName)); } - TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionId, taskId); + TaskActivityContextImpl context = new TaskActivityContextImpl( + taskName, input, taskExecutionId, taskId, traceParent); // Unhandled exceptions are allowed to escape Object output = activity.run(context); @@ -59,14 +61,17 @@ private class TaskActivityContextImpl implements TaskActivityContext { private final String rawInput; private final String taskExecutionId; private final int taskId; + private final String traceParent; private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter; - public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionId, int taskId) { + public TaskActivityContextImpl(String activityName, String rawInput, + String taskExecutionId, int taskId, String traceParent) { this.name = activityName; this.rawInput = rawInput; this.taskExecutionId = taskExecutionId; this.taskId = taskId; + this.traceParent = traceParent; } @Override @@ -92,5 +97,10 @@ public String getTaskExecutionId() { public int getTaskId() { return this.taskId; } + + @Override + public String getTraceParent() { + return this.traceParent; + } } } From 5203ce289f3039e9b2eedbb7bd370532276de0db Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Wed, 14 Jan 2026 16:32:30 +0100 Subject: [PATCH 2/5] chore: Propagate traceparent to workflow activity Signed-off-by: Javier Aliaga --- durabletask-client/pom.xml | 22 ++++++------------- .../durabletask/DurableTaskGrpcClient.java | 5 +++-- .../io/dapr/durabletask/util/TraceUtils.java | 5 +++++ .../workflows/WorkflowActivityContext.java | 1 + .../DefaultWorkflowActivityContext.java | 5 +++++ 5 files changed, 21 insertions(+), 17 deletions(-) create mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java diff --git a/durabletask-client/pom.xml b/durabletask-client/pom.xml index dee9cbcddf..b1e24d9397 100644 --- a/durabletask-client/pom.xml +++ b/durabletask-client/pom.xml @@ -125,32 +125,24 @@ - io.github.ascopes + org.xolstice.maven.plugins protobuf-maven-plugin ${protobuf-maven-plugin.version} - ${protobuf.version} - true - - ${protobuf.input.directory} - - - - io.grpc - protoc-gen-grpc-java - ${grpc.version} - - + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + ${protobuf.input.directory} - generate + compile + compile-custom - org.apache.maven.plugins maven-source-plugin diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index b88cd7f9bf..f5a74cac10 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -29,7 +29,7 @@ import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; @@ -140,7 +140,8 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { if (builder.tracer != null) { this.tracer = builder.tracer; } else { - this.tracer = OpenTelemetry.noop().getTracer("DurableTaskGrpcClient"); + //this.tracer = OpenTelemetry.noop().getTracer("DurableTaskGrpcClient"); + this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow"); } this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java b/durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java new file mode 100644 index 0000000000..68da73c3f2 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java @@ -0,0 +1,5 @@ +package io.dapr.durabletask.util; + +public class TraceUtils { + +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 2391b8f635..229caaf88c 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -25,4 +25,5 @@ public interface WorkflowActivityContext { T getInput(Class targetType); + String getTraceParent(); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index cc7cc44e74..b1f08c5af7 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -98,6 +98,11 @@ public T getInput(Class targetType) { return this.innerContext.getInput(targetType); } + @Override + public String getTraceParent() { + return this.innerContext.getTraceParent(); + } + @Override public String getTaskExecutionId() { return this.innerContext.getTaskExecutionId(); From a5b5ce9f78cf3f579568bac2bfcbcb9dd00fee1e Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Thu, 15 Jan 2026 10:04:56 +0100 Subject: [PATCH 3/5] chore: Refactor durable task and introduce telemetry Signed-off-by: Javier Aliaga --- .../durabletask/DurableTaskGrpcClient.java | 29 +-- .../durabletask/DurableTaskGrpcWorker.java | 114 ++--------- .../io/dapr/durabletask/FailureDetails.java | 8 +- .../durabletask/TaskActivityExecutor.java | 20 +- .../TaskOrchestrationExecutor.java | 18 +- .../durabletask/TaskOrchestratorResult.java | 2 +- .../durabletask/runner/ActivityRunner.java | 182 ++++++++++++++++++ .../durabletask/runner/DurableRunner.java | 66 +++++++ .../runner/OrchestratorRunner.java | 88 +++++++++ .../io/dapr/durabletask/util/TraceUtils.java | 5 - 10 files changed, 396 insertions(+), 136 deletions(-) create mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java create mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java create mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java delete mode 100644 durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java index f5a74cac10..881b9e9586 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcClient.java @@ -16,7 +16,6 @@ import com.google.protobuf.StringValue; import com.google.protobuf.Timestamp; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; -import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TraceContext; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.grpc.Channel; import io.grpc.ChannelCredentials; @@ -30,7 +29,6 @@ import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import javax.annotation.Nullable; @@ -46,7 +44,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.logging.Logger; /** @@ -204,28 +201,12 @@ public String scheduleNewOrchestrationInstance( AtomicReference response = new AtomicReference<>(); - this.observe("dapr.workflow.grpc.startInstance", (TraceContext context) -> { - builder.setParentTraceContext(context); - OrchestratorService.CreateInstanceRequest request = builder.build(); - response.set(this.sidecarClient.startInstance(request)); - }); + OrchestratorService.CreateInstanceRequest request = builder.build(); + response.set(this.sidecarClient.startInstance(request)); return response.get().getInstanceId(); } - private void observe(String spanName, Consumer fn) { - Span span = tracer.spanBuilder(spanName).startSpan(); - try { - TraceContext.Builder traceContextBuilder = TraceContext.newBuilder(); - traceContextBuilder.setTraceParent(span.getSpanContext().getTraceId()); - var context = traceContextBuilder.build(); - - fn.accept(context); - } finally { - span.end(); - } - } - @Override public void raiseEvent(String instanceId, String eventName, Object eventPayload) { Helpers.throwIfArgumentNull(instanceId, "instanceId"); @@ -239,10 +220,8 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) builder.setInput(StringValue.of(serializedPayload)); } - this.observe("dapr.workflow.grpc.raiseEvent", (TraceContext context) -> { - OrchestratorService.RaiseEventRequest request = builder.build(); - this.sidecarClient.raiseEvent(request); - }); + OrchestratorService.RaiseEventRequest request = builder.build(); + this.sidecarClient.raiseEvent(request); } diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 41f1fe686e..806e1dfc52 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -13,17 +13,18 @@ package io.dapr.durabletask; -import com.google.protobuf.StringValue; import io.dapr.durabletask.implementation.protobuf.OrchestratorService; -import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails; import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.runner.ActivityRunner; +import io.dapr.durabletask.runner.OrchestratorRunner; import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import org.apache.commons.lang3.StringUtils; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapGetter; @@ -57,6 +58,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final Duration maximumTimerInterval; private final ExecutorService workerPool; private final String appId; // App ID for cross-app routing + private final Tracer tracer; private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; private final boolean isExecutorServiceManaged; @@ -88,6 +90,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { sidecarGrpcChannel = this.managedSidecarChannel; } + this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow"); + this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval @@ -98,6 +102,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.workerPool = Context.taskWrapping(rawExecutor); this.isExecutorServiceManaged = builder.executorService == null; + + } /** @@ -172,121 +178,25 @@ public void startAndBlock() { while (workItemStream.hasNext()) { OrchestratorService.WorkItem workItem = workItemStream.next(); OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase(); + if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) { OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - logger.log(Level.FINEST, String.format("Processing orchestrator request for instance: {0}", orchestratorRequest.getInstanceId())); - // TODO: Error handling - this.workerPool.submit(() -> { - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); - - var versionBuilder = OrchestratorService.OrchestrationVersion.newBuilder(); - - if (StringUtils.isNotEmpty(taskOrchestratorResult.getVersion())) { - versionBuilder.setName(taskOrchestratorResult.getVersion()); - } - - if (taskOrchestratorResult.getPatches() != null) { - versionBuilder.addAllPatches(taskOrchestratorResult.getPatches()); - } - - OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder() - .setInstanceId(orchestratorRequest.getInstanceId()) - .addAllActions(taskOrchestratorResult.getActions()) - .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) - .setCompletionToken(workItem.getCompletionToken()) - .setVersion(versionBuilder) - .build(); - - try { - this.sidecarClient.completeOrchestratorTask(response); - logger.log(Level.FINEST, - "Completed orchestrator request for instance: {0}", - orchestratorRequest.getInstanceId()); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, - "The sidecar at address {0} is unavailable while completing the orchestrator task.", - this.getSidecarAddress()); - } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, - "Durable Task worker has disconnected from {0} while completing the orchestrator task.", - this.getSidecarAddress()); - } else { - logger.log(Level.WARNING, - "Unexpected failure completing the orchestrator task at {0}.", - this.getSidecarAddress()); - } - } - }); + this.workerPool.submit(new OrchestratorRunner(workItem, taskOrchestrationExecutor, sidecarClient, tracer)); } else if (requestType == OrchestratorService.WorkItem.RequestCase.ACTIVITYREQUEST) { OrchestratorService.ActivityRequest activityRequest = workItem.getActivityRequest(); - logger.log(Level.INFO, String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s", activityRequest.getName(), activityRequest.getOrchestrationInstance().getInstanceId(), Context.current())); - // Extract trace context from the ActivityRequest and set it as current - Context traceContext = extractTraceContext(activityRequest); + this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer)); - // TODO: Error handling - this.workerPool.submit(() -> { - String output = null; - TaskFailureDetails failureDetails = null; - try { - output = taskActivityExecutor.execute( - activityRequest.getName(), - activityRequest.getInput().getValue(), - activityRequest.getTaskExecutionId(), - activityRequest.getTaskId(), - activityRequest.getParentTraceContext().getTraceParent()); - } catch (Throwable e) { - failureDetails = TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build(); - } - - OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse - .newBuilder() - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) - .setTaskId(activityRequest.getTaskId()) - .setCompletionToken(workItem.getCompletionToken()); - - if (output != null) { - responseBuilder.setResult(StringValue.of(output)); - } - - if (failureDetails != null) { - responseBuilder.setFailureDetails(failureDetails); - } - - try { - this.sidecarClient.completeActivityTask(responseBuilder.build()); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - logger.log(Level.WARNING, - "The sidecar at address {0} is unavailable while completing the activity task.", - this.getSidecarAddress()); - } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { - logger.log(Level.WARNING, - "Durable Task worker has disconnected from {0} while completing the activity task.", - this.getSidecarAddress()); - } else { - logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", - this.getSidecarAddress()); - } - } - }); } else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) { // No-op } else { diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java b/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java index f5d9d834ea..357946ab31 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java @@ -124,7 +124,13 @@ public boolean isCausedBy(Class exceptionClass) { } } - static String getFullStackTrace(Throwable e) { + /** + * Gets the full stack trace of the specified exception. + * + * @param e the exception + * @return the full stack trace of the exception + */ + public static String getFullStackTrace(Throwable e) { StackTraceElement[] elements = e.getStackTrace(); // Plan for 256 characters per stack frame (which is likely on the high-end) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index 5fecdbef03..c4c66e892a 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -16,11 +16,18 @@ import java.util.HashMap; import java.util.logging.Logger; -final class TaskActivityExecutor { +public final class TaskActivityExecutor { private final HashMap activityFactories; private final DataConverter dataConverter; private final Logger logger; + /** + * Constructor. + * + * @param activityFactories the activity factories to use for creating activities + * @param dataConverter the data converter to use for serializing and deserializing activity inputs and outputs + * @param logger the logger to use for logging + */ public TaskActivityExecutor( HashMap activityFactories, DataConverter dataConverter, @@ -30,6 +37,17 @@ public TaskActivityExecutor( this.logger = logger; } + /** + * Executes an activity task. + * + * @param taskName the name of the activity task to execute + * @param input the serialized input payload for the activity task + * @param taskExecutionId Unique ID for the task execution. + * @param taskId Auto-incrementing ID for the task. + * @param traceParent The traceparent header value. + * @return the serialized output payload for the activity task, or null if the activity task returned null. + * @throws Throwable if an unhandled exception occurs during activity task execution. + */ public String execute(String taskName, String input, String taskExecutionId, int taskId, String traceParent) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 0b540dabc0..0a7285f2d4 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -48,7 +48,7 @@ import java.util.function.IntFunction; import java.util.logging.Logger; -final class TaskOrchestrationExecutor { +public final class TaskOrchestrationExecutor { private static final String EMPTY_STRING = ""; private final TaskOrchestrationFactories orchestrationFactories; @@ -57,6 +57,15 @@ final class TaskOrchestrationExecutor { private final Duration maximumTimerInterval; private final String appId; + /** + * Creates a new TaskOrchestrationExecutor. + * + * @param orchestrationFactories map of orchestration names to their factories + * @param dataConverter converter for serializing/deserializing data + * @param maximumTimerInterval maximum duration for timer intervals + * @param logger logger for orchestration execution + * @param appId application ID for cross-app routing + */ public TaskOrchestrationExecutor( TaskOrchestrationFactories orchestrationFactories, DataConverter dataConverter, @@ -70,6 +79,13 @@ public TaskOrchestrationExecutor( this.appId = appId; // extracted from router } + /** + * Executes the orchestration with the given past and new events. + * + * @param pastEvents list of past history events + * @param newEvents list of new history events + * @return the result of the orchestrator execution + */ public TaskOrchestratorResult execute(List pastEvents, List newEvents) { ContextImplTask context = new ContextImplTask(pastEvents, newEvents); diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java index c8f4ecaee5..2a4c5f8a2a 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java @@ -19,7 +19,7 @@ import java.util.Collections; import java.util.List; -final class TaskOrchestratorResult { +public final class TaskOrchestratorResult { private final Collection actions; diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java new file mode 100644 index 0000000000..c9c0baa434 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java @@ -0,0 +1,182 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.runner; + +import com.google.protobuf.StringValue; +import io.dapr.durabletask.FailureDetails; +import io.dapr.durabletask.TaskActivityExecutor; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ActivityRunner extends DurableRunner { + private static final Logger logger = Logger.getLogger(ActivityRunner.class.getPackage().getName()); + + private final OrchestratorService.ActivityRequest activityRequest; + private final TaskActivityExecutor taskActivityExecutor; + + /** + * Constructor. + * + *

This class executes the activity requests

+ * + * @param workItem work item to be executed + * @param taskActivityExecutor executor for the activity + * @param sidecarClient sidecar client to communicate with the sidecar + * @param tracer tracer to be used for tracing + */ + public ActivityRunner( + OrchestratorService.WorkItem workItem, + TaskActivityExecutor taskActivityExecutor, + TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, + @Nullable Tracer tracer) { + super(workItem, sidecarClient, tracer); + this.activityRequest = workItem.getActivityRequest(); + this.taskActivityExecutor = taskActivityExecutor; + } + + @Override + public void run() { + if (tracer != null) { + runWithTracing(); + } else { + runWithoutTracing(); + } + } + + private void runWithTracing() { + Context parentContext = extractTraceContext(); + + Span span = tracer.spanBuilder("activity:" + activityRequest.getName()) + .setParent(parentContext) + .setSpanKind(SpanKind.INTERNAL) + .setAttribute("durabletask.task.instance_id", + activityRequest.getOrchestrationInstance().getInstanceId()) + .setAttribute("durabletask.task.id", activityRequest.getTaskId()) + .setAttribute("durabletask.activity.name", activityRequest.getName()) + .startSpan(); + + try (Scope scope = span.makeCurrent()) { + executeActivity(); + } catch (Throwable e) { + logger.log(Level.WARNING, "Failed to complete activity task.", e); + span.setStatus(StatusCode.ERROR, "Failed to complete activity task"); + span.recordException(e); + } finally { + span.end(); + } + } + + private void runWithoutTracing() { + try { + executeActivity(); + } catch (Throwable e) { + logger.log(Level.WARNING, "Failed to complete activity task.", e); + } + } + + private void executeActivity() throws Throwable { + String output = null; + OrchestratorService.TaskFailureDetails failureDetails = null; + Throwable failureException = null; + try { + output = taskActivityExecutor.execute( + activityRequest.getName(), + activityRequest.getInput().getValue(), + activityRequest.getTaskExecutionId(), + activityRequest.getTaskId(), + activityRequest.getParentTraceContext().getTraceParent()); + } catch (Throwable e) { + failureDetails = OrchestratorService.TaskFailureDetails.newBuilder() + .setErrorType(e.getClass().getName()) + .setErrorMessage(e.getMessage()) + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) + .build(); + failureException = e; + } + + OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse + .newBuilder() + .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) + .setTaskId(activityRequest.getTaskId()) + .setCompletionToken(workItem.getCompletionToken()); + + if (output != null) { + responseBuilder.setResult(StringValue.of(output)); + } + + if (failureDetails != null) { + responseBuilder.setFailureDetails(failureDetails); + } + + try { + this.sidecarClient.completeActivityTask(responseBuilder.build()); + } catch (StatusRuntimeException e) { + logException(e); + throw e; + } + + if (failureException != null) { + throw failureException; + } + } + + private Context extractTraceContext() { + if (!activityRequest.hasParentTraceContext()) { + return Context.current(); + } + + OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext(); + String traceParent = traceContext.getTraceParent(); + + if (traceParent.isEmpty()) { + return Context.current(); + } + + Map carrier = new HashMap<>(); + carrier.put("traceparent", traceParent); + if (traceContext.hasTraceState()) { + carrier.put("tracestate", traceContext.getTraceState().getValue()); + } + + TextMapGetter> getter = new TextMapGetter<>() { + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) { + return carrier.get(key); + } + }; + + return W3CTraceContextPropagator.getInstance() + .extract(Context.current(), carrier, getter); + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java new file mode 100644 index 0000000000..b59aa58046 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/DurableRunner.java @@ -0,0 +1,66 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.runner; + +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.trace.Tracer; + +import javax.annotation.Nullable; +import java.util.logging.Level; +import java.util.logging.Logger; + +public abstract class DurableRunner implements Runnable { + private static final Logger logger = Logger.getLogger(DurableRunner.class.getPackage().getName()); + public final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient; + public final OrchestratorService.WorkItem workItem; + @Nullable + public final Tracer tracer; + + /** + * Constructs a new instance of the DurableRunner. + * + * @param workItem the work item to be executed + * @param sidecarClient the sidecar client used to communicate with the durable task sidecar + * @param tracer the tracer used for tracing operations; can be null if tracing is not required + */ + public DurableRunner(OrchestratorService.WorkItem workItem, + TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, + @Nullable Tracer tracer) { + this.workItem = workItem; + this.sidecarClient = sidecarClient; + this.tracer = tracer; + } + + protected String getSidecarAddress() { + return this.sidecarClient.getChannel().authority(); + } + + protected void logException(StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, + "The sidecar at address {0} is unavailable while completing the activity task.", + this.sidecarClient.getChannel().authority()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, + "Durable Task worker has disconnected from {0} while completing the activity task.", + this.sidecarClient.getChannel().authority()); + } else { + logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.", + this.sidecarClient.getChannel().authority()); + } + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java new file mode 100644 index 0000000000..77046fd225 --- /dev/null +++ b/durabletask-client/src/main/java/io/dapr/durabletask/runner/OrchestratorRunner.java @@ -0,0 +1,88 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask.runner; + +import com.google.protobuf.StringValue; +import io.dapr.durabletask.TaskOrchestrationExecutor; +import io.dapr.durabletask.TaskOrchestratorResult; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; +import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.trace.Tracer; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class OrchestratorRunner extends DurableRunner { + private static final Logger logger = Logger.getLogger(OrchestratorRunner.class.getPackage().getName()); + + private final OrchestratorService.OrchestratorRequest orchestratorRequest; + private final TaskOrchestrationExecutor taskOrchestrationExecutor; + + /** + * Constructs a new instance of the OrchestratorRunner class. + * + * @param workItem The work item containing details about the orchestrator task to be executed. + * @param taskOrchestrationExecutor The executor responsible for running task orchestration logic. + * @param sidecarClient The gRPC stub for communication with the Task Hub sidecar service. + * @param tracer An optional tracer used for distributed tracing, can be null. + */ + public OrchestratorRunner( + OrchestratorService.WorkItem workItem, + TaskOrchestrationExecutor taskOrchestrationExecutor, + TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient, + @Nullable Tracer tracer) { + + super(workItem, sidecarClient, tracer); + this.orchestratorRequest = workItem.getOrchestratorRequest(); + this.taskOrchestrationExecutor = taskOrchestrationExecutor; + } + + @Override + public void run() { + TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList()); + + var versionBuilder = OrchestratorService.OrchestrationVersion.newBuilder(); + + if (StringUtils.isNotEmpty(taskOrchestratorResult.getVersion())) { + versionBuilder.setName(taskOrchestratorResult.getVersion()); + } + + if (taskOrchestratorResult.getPatches() != null) { + versionBuilder.addAllPatches(taskOrchestratorResult.getPatches()); + } + + OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .addAllActions(taskOrchestratorResult.getActions()) + .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) + .setCompletionToken(workItem.getCompletionToken()) + .setVersion(versionBuilder) + .build(); + + try { + this.sidecarClient.completeOrchestratorTask(response); + logger.log(Level.FINEST, + "Completed orchestrator request for instance: {0}", + orchestratorRequest.getInstanceId()); + } catch (StatusRuntimeException e) { + this.logException(e); + } + } +} diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java b/durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java deleted file mode 100644 index 68da73c3f2..0000000000 --- a/durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.dapr.durabletask.util; - -public class TraceUtils { - -} From 668bcf44f73299900e5d179c445db3dd14cd8b77 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Fri, 16 Jan 2026 12:43:25 +0100 Subject: [PATCH 4/5] chore: Fix bug, return type is required Signed-off-by: Javier Aliaga --- .../io/dapr/durabletask/TaskOrchestrationContext.java | 2 +- .../src/main/java/io/dapr/workflows/WorkflowContext.java | 2 +- .../io/dapr/workflows/DefaultWorkflowContextTest.java | 8 ++++++-- .../runtime/DefaultWorkflowActivityContextTest.java | 6 +++++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java index f2f5ace8c2..bb8c1b9873 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java @@ -446,7 +446,7 @@ default Task callSubOrchestrator(String name) { * @return a new {@link Task} that completes when the sub-orchestration completes or fails */ default Task callSubOrchestrator(String name, Object input) { - return this.callSubOrchestrator(name, input, null); + return this.callSubOrchestrator(name, input, Void.class); } /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 40d83af0ee..1d93f71c85 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -381,7 +381,7 @@ default Task callChildWorkflow(String name) { * @return a new {@link Task} that completes when the child-workflow completes or fails */ default Task callChildWorkflow(String name, Object input) { - return this.callChildWorkflow(name, input, null); + return this.callChildWorkflow(name, input, Void.class); } /** diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index 22e364f244..95cc33bd71 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -38,7 +38,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class DefaultWorkflowContextTest { private DefaultWorkflowContext context; @@ -289,7 +293,7 @@ public void callChildWorkflowWithName() { String expectedName = "TestActivity"; context.callChildWorkflow(expectedName); - verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, null, null, null, null); + verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, null, null, null, Void.class); } @Test diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java index 13cf3b6c6a..ceb6648e6c 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContextTest.java @@ -5,7 +5,9 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -21,6 +23,7 @@ void shouldSuccessfullyCreateContextAndReturnCorrectValuesForAllMethods() { when(mockInnerContext.getName()).thenReturn("TestActivity"); when(mockInnerContext.getInput(any())).thenReturn("TestInput"); when(mockInnerContext.getTaskExecutionId()).thenReturn("TestExecutionId"); + when(mockInnerContext.getTraceParent()).thenReturn("00244654132154564654"); assertNotNull(context.getLogger()); assertEquals("TestActivity", context.getName()); @@ -29,6 +32,7 @@ void shouldSuccessfullyCreateContextAndReturnCorrectValuesForAllMethods() { assertEquals("TestInput", input); assertEquals("TestExecutionId", context.getTaskExecutionId()); + assertEquals("00244654132154564654", context.getTraceParent()); } @Test From b4c9d1353c08d574fa53b2f0236f27df96d99824 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Wed, 18 Feb 2026 15:08:07 +0100 Subject: [PATCH 5/5] chore: Checkstyle Signed-off-by: Javier Aliaga --- .../java/io/dapr/durabletask/TaskOrchestratorResult.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java index 2a4c5f8a2a..9efb0751f2 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java @@ -29,6 +29,14 @@ public final class TaskOrchestratorResult { private final List patches; + /** + * Constructor. + * + * @param actions the actions to take + * @param customStatus the custom status to return + * @param version the orchestrator version + * @param patches the patches to apply + */ public TaskOrchestratorResult(Collection actions, String customStatus, String version, List patches) { this.actions = Collections.unmodifiableCollection(actions);