diff --git a/modules/flowable-engine/src/main/java/org/flowable/engine/impl/bpmn/behavior/ParallelGatewayActivityBehavior.java b/modules/flowable-engine/src/main/java/org/flowable/engine/impl/bpmn/behavior/ParallelGatewayActivityBehavior.java index 2485b5cca81..3ea41ece65c 100644 --- a/modules/flowable-engine/src/main/java/org/flowable/engine/impl/bpmn/behavior/ParallelGatewayActivityBehavior.java +++ b/modules/flowable-engine/src/main/java/org/flowable/engine/impl/bpmn/behavior/ParallelGatewayActivityBehavior.java @@ -1,9 +1,9 @@ /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,34 +15,41 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Objects; +import java.util.Set; import org.flowable.bpmn.model.Activity; import org.flowable.bpmn.model.FlowElement; import org.flowable.bpmn.model.FlowNode; import org.flowable.bpmn.model.ParallelGateway; +import org.flowable.bpmn.model.SequenceFlow; import org.flowable.common.engine.api.FlowableException; +import org.flowable.engine.HistoryService; import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.history.HistoricActivityInstance; +import org.flowable.engine.impl.HistoricActivityInstanceQueryProperty; +import org.flowable.engine.impl.context.Context; import org.flowable.engine.impl.persistence.entity.ExecutionEntity; import org.flowable.engine.impl.persistence.entity.ExecutionEntityManager; +import org.flowable.engine.impl.persistence.entity.HistoricActivityInstanceEntityImpl; import org.flowable.engine.impl.util.CommandContextUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Implementation of the Parallel Gateway/AND gateway as defined in the BPMN 2.0 specification. - * + * * The Parallel Gateway can be used for splitting a path of execution into multiple paths of executions (AND-split/fork behavior), one for every outgoing sequence flow. - * + * * The Parallel Gateway can also be used for merging or joining paths of execution (AND-join). In this case, on every incoming sequence flow an execution needs to arrive, before leaving the Parallel * Gateway (and potentially then doing the fork behavior in case of multiple outgoing sequence flow). - * - * Note that there is a slight difference to spec (p. 436): "The parallel gateway is activated if there is at least one Token on each incoming sequence flow." We only check the number of incoming - * tokens to the number of sequenceflow. So if two tokens would arrive through the same sequence flow, our implementation would activate the gateway. - * + * * Note that a Parallel Gateway having one incoming and multiple outgoing sequence flow, is the same as having multiple outgoing sequence flow on a given activity. However, a parallel gateway does NOT * check conditions on the outgoing sequence flow. - * + * * @author Joram Barrez * @author Tom Baeyens */ @@ -52,6 +59,8 @@ public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelGatewayActivityBehavior.class); + private HistoryService historyService = Context.getProcessEngineConfiguration().getHistoryService(); + @Override public void execute(DelegateExecution execution) { @@ -83,19 +92,90 @@ public void execute(DelegateExecution execution) { } int nbrOfExecutionsToJoin = parallelGateway.getIncomingFlows().size(); - int nbrOfExecutionsCurrentlyJoined = joinedExecutions.size(); + + Set unmatchedIncomingFlowIds = new HashSet<>(); + + // Add all incoming flows to the unmatchedIncomingFlowIds Set + for (SequenceFlow incomingFlow : parallelGateway.getIncomingFlows()) { + if (incomingFlow.getId() != null) { + unmatchedIncomingFlowIds.add(incomingFlow.getId()); + } else { + // If incoming flow has no id, then we build the string used as the flowable default id + // See getArtificialSequenceFlowId() in org.flowable.engine.impl.persistence.entity.ActivityInstanceEntityManagerImpl + unmatchedIncomingFlowIds.add(new StringBuilder("_flow_").append(incomingFlow.getSourceRef()).append("__").append(incomingFlow.getTargetRef()).toString()); + } + } + + // Collect HistoricActivity cache because historyService is not flushed between the sequenceFlow completing and reaching parallelGateway + // Sort output so that the most recent activity is first in the list + List historicActivitiesCache = CommandContextUtil.getEntityCache() + .findInCache(HistoricActivityInstanceEntityImpl.class) + .stream().map(historicActivity -> (HistoricActivityInstance) historicActivity) + .sorted(Comparator.comparing(HistoricActivityInstance::getEndTime, Comparator.nullsFirst(Comparator.reverseOrder()))).toList(); + + // For each execution, get the most recent sequenceflow activity and remove it from the Set of incomingFlow ids + // If the Set becomes empty, then we know that we have all the incoming flows fulfilled in our joinedExecutions + // Ignore the execution unless the most recent activity is the join gateway, and the previous activity was the relevant sequenceFlow + joinedExecutions: + for (ExecutionEntity joinedExecution : joinedExecutions) { + List latestActivities = new ArrayList<>(); + + // Add relevant latest activities (i.e. 2 most recent activities) to the latestActivities List & sort using most recent activity first + latestActivities.addAll(historyService + .createHistoricActivityInstanceQuery() + .executionId(joinedExecution.getId()) + .orderBy(HistoricActivityInstanceQueryProperty.END).desc() + .listPage(0, 2)); + + // Add any relevant cached entities to the start of the list + latestActivities.addAll(0, historicActivitiesCache.stream().filter(historicActivity -> Objects.equals(historicActivity.getExecutionId(), joinedExecution.getId())).toList()); + + if (latestActivities.isEmpty()) { + continue; + } + + // Walk back through the latest activities for this execution & check if they include a sequenceFlow that leads to this gateway + latestActivities: + for (int i = 0; i < latestActivities.size(); i++) { + HistoricActivityInstance latestActivity = latestActivities.get(i); + + // Latest activity is parallelGateway, but not this parallelGateway - i.e. its not the execution we are looking for + if (Objects.equals(latestActivity.getActivityType(), "parallelGateway") && !Objects.equals(latestActivity.getActivityId(), parallelGateway.getId())) { + continue joinedExecutions; + } + // Latest activity is parallelGateway && id matches. Its the execution we are looking for. Our sequenceFlow should be the next activity + if (Objects.equals(latestActivity.getActivityType(), "parallelGateway") && Objects.equals(latestActivity.getActivityId(), parallelGateway.getId())) { + continue; + } + // If we for some reason get to here and the activity is not a sequence flow, then we are in the wrong execution + if (!Objects.equals(latestActivity.getActivityType(), "sequenceFlow")) { + continue joinedExecutions; + } + + unmatchedIncomingFlowIds.remove(latestActivity.getActivityId()); + + // Once the unmatchedIncomingFlowIds Set is empty, then break and avoid running unneeded historyService queries + if (unmatchedIncomingFlowIds.isEmpty()) { + break joinedExecutions; + } + + // if we get to here, then we have already completed analysing this execution. Continue to the next execution + continue joinedExecutions; + } + } // Fork // Is needed to set the endTime for all historic activity joins CommandContextUtil.getActivityInstanceEntityManager().recordActivityEnd((ExecutionEntity) execution, null); - if (nbrOfExecutionsCurrentlyJoined == nbrOfExecutionsToJoin) { + // if all incoming flows were executed, then the gateway can continue + if (unmatchedIncomingFlowIds.isEmpty()) { // Fork if (LOGGER.isDebugEnabled()) { - LOGGER.debug("parallel gateway '{}' ({}) activates: {} of {} joined", execution.getCurrentActivityId(), - execution.getId(), nbrOfExecutionsCurrentlyJoined, nbrOfExecutionsToJoin); + LOGGER.debug("parallel gateway '{}' ({}) activates: {} of {} joined", execution.getCurrentActivityId(), + execution.getId(), nbrOfExecutionsToJoin, nbrOfExecutionsToJoin); } if (parallelGateway.getIncomingFlows().size() > 1) { @@ -116,8 +196,8 @@ public void execute(DelegateExecution execution) { CommandContextUtil.getAgenda().planTakeOutgoingSequenceFlowsOperation((ExecutionEntity) execution, false); // false -> ignoring conditions on parallel gw } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug("parallel gateway '{}' ({}) does not activate: {} of {} joined", execution.getCurrentActivityId(), - execution.getId(), nbrOfExecutionsCurrentlyJoined, nbrOfExecutionsToJoin); + LOGGER.debug("parallel gateway '{}' ({}) does not activate: {} of {} incoming flows were not completed", execution.getCurrentActivityId(), + execution.getId(), unmatchedIncomingFlowIds.size(), nbrOfExecutionsToJoin); } } diff --git a/modules/flowable-engine/src/test/java/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.java b/modules/flowable-engine/src/test/java/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.java index bcd1f82bccf..cb65a41639b 100644 --- a/modules/flowable-engine/src/test/java/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.java +++ b/modules/flowable-engine/src/test/java/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.java @@ -1,9 +1,9 @@ /* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -84,4 +84,37 @@ public void testUnbalancedForkJoin() { assertProcessEnded(pi.getId()); } + @Test + @Deployment + public void testMultipleTokensJoin() { + + ProcessInstance pi = runtimeService.startProcessInstanceByKey("multipleTokensJoin"); + TaskQuery query = taskService.createTaskQuery().processInstanceId(pi.getId()).orderByTaskName().asc(); + + // There should be two tokens waiting at Wait Task 1 (two sequence flows lead to it in bpmn) + List tasks = query.list(); + assertThat(tasks) + .extracting(Task::getName) + .containsExactly("Wait Task 1", "Wait Task 1", "Wait Task 2"); + + // Completing both Wait Task 1s should not satisfy the joining parallel gateway, and Final Task should still not be reached + // We should still be waiting at Wait Task 2 + tasks.stream().filter(task -> task.getName().equals("Wait Task 1")).forEach(task -> { + taskService.complete(task.getId()); + }); + + tasks = query.list(); + assertThat(tasks) + .extracting(Task::getName) + .containsExactly("Wait Task 2"); + + // Completing Wait Task 2 should satisfy the Parallel Gateway, and we progress to the final task + taskService.complete(tasks.get(0).getId()); + + tasks = query.list(); + assertThat(tasks) + .extracting(Task::getName) + .containsExactly("Final Task"); + } + } diff --git a/modules/flowable-engine/src/test/resources/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.testMultipleTokensJoin.bpmn20.xml b/modules/flowable-engine/src/test/resources/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.testMultipleTokensJoin.bpmn20.xml new file mode 100644 index 00000000000..ce3df48807f --- /dev/null +++ b/modules/flowable-engine/src/test/resources/org/flowable/examples/bpmn/gateway/ParallelGatewayTest.testMultipleTokensJoin.bpmn20.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file