diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index 8b573a986c..b747c69dff 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -16,7 +16,9 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Optional; +import java.util.function.UnaryOperator; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -24,12 +26,26 @@ class EventFilterDetails { private int activeUpdates = 0; private ResourceEvent lastEvent; + private String lastOwnUpdatedResourceVersion; public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; } - public boolean decreaseActiveUpdates() { + /** + * resourceVersion is needed for case when multiple parallel updates happening inside the + * controller to prevent race condition and send event from {@link + * ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)} + */ + public boolean decreaseActiveUpdates(String updatedResourceVersion) { + if (updatedResourceVersion != null + && (lastOwnUpdatedResourceVersion == null + || ReconcilerUtilsInternal.compareResourceVersions( + updatedResourceVersion, lastOwnUpdatedResourceVersion) + > 0)) { + lastOwnUpdatedResourceVersion = updatedResourceVersion; + } + activeUpdates = activeUpdates - 1; return activeUpdates == 0; } @@ -38,15 +54,19 @@ public void setLastEvent(ResourceEvent event) { lastEvent = event; } - public Optional getLatestEventAfterLastUpdateEvent(String updatedResourceVersion) { + public Optional getLatestEventAfterLastUpdateEvent() { if (lastEvent != null - && (updatedResourceVersion == null + && (lastOwnUpdatedResourceVersion == null || ReconcilerUtilsInternal.compareResourceVersions( lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(), - updatedResourceVersion) + lastOwnUpdatedResourceVersion) > 0)) { return Optional.of(lastEvent); } return Optional.empty(); } + + public int getActiveUpdates() { + return activeUpdates; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index dcfe687a2f..301ece4424 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -93,9 +93,7 @@ public void changeNamespaces(Set namespaces) { @SuppressWarnings("unchecked") public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator updateMethod) { ResourceID id = ResourceID.fromResource(resourceToUpdate); - if (log.isDebugEnabled()) { - log.debug("Update and cache: {}", id); - } + log.debug("Update and cache: {}", id); R updatedResource = null; try { temporaryResourceCache.startEventFilteringModify(id); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 6e1d30c323..1dbbf36043 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -85,12 +85,22 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } var ed = activeUpdates.get(resourceID); - if (ed.decreaseActiveUpdates()) { - activeUpdates.remove(resourceID); - return ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion); - } else { + if (ed == null || !ed.decreaseActiveUpdates(updatedResourceVersion)) { + log.debug( + "Active updates {} for resource id: {}", + ed != null ? ed.getActiveUpdates() : 0, + resourceID); return Optional.empty(); } + activeUpdates.remove(resourceID); + var res = ed.getLatestEventAfterLastUpdateEvent(); + log.debug( + "Zero active updates for resource id: {}; event after update event: {}; updated resource" + + " version: {}", + resourceID, + res.isPresent(), + updatedResourceVersion); + return res; } public void onDeleteEvent(T resource, boolean unknownState) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index e2c3de8975..c3a6f8e91e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -267,14 +267,83 @@ void filterAddEventBeforeUpdate() { assertNoEventProduced(); } + @Test + void multipleCachingFilteringUpdates() { + withRealTemporaryResourceCache(); + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + latch.countDown(); + latch2.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + + assertNoEventProduced(); + } + + @Test + void multipleCachingFilteringUpdates_variant2() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + latch.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + latch2.countDown(); + + assertNoEventProduced(); + } + + @Test + void multipleCachingFilteringUpdates_variant3() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + latch.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + latch2.countDown(); + + assertNoEventProduced(); + } + + @Test + void multipleCachingFilteringUpdates_variant4() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + latch.countDown(); + latch2.countDown(); + + assertNoEventProduced(); + } + private void assertNoEventProduced() { await() .pollDelay(Duration.ofMillis(50)) .timeout(Duration.ofMillis(51)) .untilAsserted( - () -> { - verify(informerEventSource, never()).handleEvent(any(), any(), any(), any()); - }); + () -> verify(informerEventSource, never()).handleEvent(any(), any(), any(), any())); } private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {