.informer.list-limit` | `Long` | Page size for paginated informer list requests; omit for no pagination |
#### Retry
diff --git a/micrometer-support/pom.xml b/micrometer-support/pom.xml
index 42e94e5cb8..ae3c4d0be1 100644
--- a/micrometer-support/pom.xml
+++ b/micrometer-support/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
micrometer-support
diff --git a/migration/pom.xml b/migration/pom.xml
index 9211cd6cd6..39785c13dd 100644
--- a/migration/pom.xml
+++ b/migration/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
migration
diff --git a/operator-framework-bom/pom.xml b/operator-framework-bom/pom.xml
index c2b97dfba3..30fb73bfa3 100644
--- a/operator-framework-bom/pom.xml
+++ b/operator-framework-bom/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
operator-framework-bom
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
pom
Operator SDK - Bill of Materials
Java SDK for implementing Kubernetes operators
diff --git a/operator-framework-core/pom.xml b/operator-framework-core/pom.xml
index 3127d2e9fb..2356433ca9 100644
--- a/operator-framework-core/pom.xml
+++ b/operator-framework-core/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
../pom.xml
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java
index c27b13714e..93e296924b 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java
@@ -321,6 +321,8 @@ private ResolvedControllerConfiguration
controllerCon
var triggerReconcilerOnAllEvents =
annotation != null && annotation.triggerReconcilerOnAllEvents();
+ var defaultFilters = annotation == null || annotation.defaultFilters();
+
InformerConfiguration
informerConfig =
InformerConfiguration.builder(resourceClass)
.initFromAnnotation(annotation != null ? annotation.informer() : null, context)
@@ -341,7 +343,8 @@ private
ResolvedControllerConfiguration
controllerCon
dependentFieldManager,
this,
informerConfig,
- triggerReconcilerOnAllEvents);
+ triggerReconcilerOnAllEvents,
+ defaultFilters);
}
/**
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java
index 63177b614f..d3c4c60082 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java
@@ -121,4 +121,8 @@ default boolean triggerReconcilerOnAllEvent() {
default boolean triggerReconcilerOnAllEvents() {
return false;
}
+
+ default boolean isDefaultFilters() {
+ return true;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java
index 7856654f1e..1c1e03c870 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java
@@ -46,6 +46,7 @@ public class ControllerConfigurationOverrider {
private Map configurations;
private final InformerConfiguration.Builder config;
private boolean triggerReconcilerOnAllEvents;
+ private boolean defaultFilters;
private ControllerConfigurationOverrider(ControllerConfiguration original) {
this.finalizer = original.getFinalizerName();
@@ -59,6 +60,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration original) {
this.name = original.getName();
this.fieldManager = original.fieldManager();
this.triggerReconcilerOnAllEvents = original.triggerReconcilerOnAllEvents();
+ this.defaultFilters = original.isDefaultFilters();
}
public ControllerConfigurationOverrider withFinalizer(String finalizer) {
@@ -134,6 +136,11 @@ public ControllerConfigurationOverrider withLabelSelector(String labelSelecto
return this;
}
+ public ControllerConfigurationOverrider withShardSelector(String shardSelector) {
+ config.withShardSelector(shardSelector);
+ return this;
+ }
+
public ControllerConfigurationOverrider withReconciliationMaxInterval(
Duration reconciliationMaxInterval) {
this.reconciliationMaxInterval = reconciliationMaxInterval;
@@ -186,6 +193,11 @@ public ControllerConfigurationOverrider withTriggerReconcilerOnAllEvents(
return this;
}
+ public ControllerConfigurationOverrider withDefaultFilters(boolean defaultFilters) {
+ this.defaultFilters = defaultFilters;
+ return this;
+ }
+
/**
* Sets a max page size limit when starting the informer. This will result in pagination while
* populating the cache. This means that longer lists will take multiple requests to fetch. See
@@ -231,6 +243,7 @@ public ControllerConfiguration build() {
original.getConfigurationService(),
config.buildForController(),
triggerReconcilerOnAllEvents,
+ defaultFilters,
original.getWorkflowSpec().orElse(null));
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java
index 3e620f8f91..91cfaafa8f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java
@@ -45,6 +45,7 @@ public class ResolvedControllerConfiguration
private final ConfigurationService configurationService;
private final String fieldManager;
private final boolean triggerReconcilerOnAllEvents;
+ private final boolean defaultFilters;
private WorkflowSpec workflowSpec;
public ResolvedControllerConfiguration(ControllerConfiguration
other) {
@@ -61,6 +62,7 @@ public ResolvedControllerConfiguration(ControllerConfiguration
other) {
other.getConfigurationService(),
other.getInformerConfig(),
other.triggerReconcilerOnAllEvents(),
+ other.isDefaultFilters(),
other.getWorkflowSpec().orElse(null));
}
@@ -77,6 +79,7 @@ public ResolvedControllerConfiguration(
ConfigurationService configurationService,
InformerConfiguration
informerConfig,
boolean triggerReconcilerOnAllEvents,
+ boolean defaultFilters,
WorkflowSpec workflowSpec) {
this(
name,
@@ -90,7 +93,8 @@ public ResolvedControllerConfiguration(
fieldManager,
configurationService,
informerConfig,
- triggerReconcilerOnAllEvents);
+ triggerReconcilerOnAllEvents,
+ defaultFilters);
setWorkflowSpec(workflowSpec);
}
@@ -106,7 +110,8 @@ protected ResolvedControllerConfiguration(
String fieldManager,
ConfigurationService configurationService,
InformerConfiguration
informerConfig,
- boolean triggerReconcilerOnAllEvents) {
+ boolean triggerReconcilerOnAllEvents,
+ boolean defaultFilters) {
this.informerConfig = informerConfig;
this.configurationService = configurationService;
this.name = ControllerConfiguration.ensureValidName(name, associatedReconcilerClassName);
@@ -120,6 +125,7 @@ protected ResolvedControllerConfiguration(
ControllerConfiguration.ensureValidFinalizerName(finalizer, getResourceTypeName());
this.fieldManager = fieldManager;
this.triggerReconcilerOnAllEvents = triggerReconcilerOnAllEvents;
+ this.defaultFilters = defaultFilters;
}
protected ResolvedControllerConfiguration(
@@ -139,7 +145,8 @@ protected ResolvedControllerConfiguration(
null,
configurationService,
InformerConfiguration.builder(resourceClass).buildForController(),
- false);
+ false,
+ true);
}
@Override
@@ -234,4 +241,9 @@ public String fieldManager() {
public boolean triggerReconcilerOnAllEvents() {
return triggerReconcilerOnAllEvents;
}
+
+ @Override
+ public boolean isDefaultFilters() {
+ return defaultFilters;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java
index 7f0d266684..04f97902d3 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java
@@ -71,6 +71,18 @@
*/
String labelSelector() default NO_VALUE_SET;
+ /**
+ * Optional shard selector used to restrict the set of resources the associated informer will act
+ * upon to a single shard, typically when the same workload is split across several operator
+ * instances. Just like {@link #labelSelector()} it is expressed as a label selector and can be
+ * made of multiple comma separated requirements that act as a logical AND operator. When both a
+ * label selector and a shard selector are set, the resulting informer only watches resources
+ * matching both (the two selectors are combined with a logical AND).
+ *
+ * @return the shard selector
+ */
+ String shardSelector() default NO_VALUE_SET;
+
/**
* Optional {@link OnAddFilter} to filter add events sent to the associated informer
*
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java
index 20d7df7136..6c92dcdcc1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java
@@ -47,6 +47,7 @@ public class InformerConfiguration {
private Set namespaces;
private Boolean followControllerNamespaceChanges;
private String labelSelector;
+ private String shardSelector;
private OnAddFilter super R> onAddFilter;
private OnUpdateFilter super R> onUpdateFilter;
private OnDeleteFilter super R> onDeleteFilter;
@@ -62,6 +63,7 @@ protected InformerConfiguration(
Set namespaces,
boolean followControllerNamespaceChanges,
String labelSelector,
+ String shardSelector,
OnAddFilter super R> onAddFilter,
OnUpdateFilter super R> onUpdateFilter,
OnDeleteFilter super R> onDeleteFilter,
@@ -77,6 +79,7 @@ protected InformerConfiguration(
this.namespaces = namespaces;
this.followControllerNamespaceChanges = followControllerNamespaceChanges;
this.labelSelector = labelSelector;
+ this.shardSelector = shardSelector;
this.onAddFilter = onAddFilter;
this.onUpdateFilter = onUpdateFilter;
this.onDeleteFilter = onDeleteFilter;
@@ -113,6 +116,7 @@ public static InformerConfiguration.Builder builder(
original.namespaces,
original.followControllerNamespaceChanges,
original.labelSelector,
+ original.shardSelector,
original.onAddFilter,
original.onUpdateFilter,
original.onDeleteFilter,
@@ -125,11 +129,6 @@ public static InformerConfiguration.Builder builder(
.builder;
}
- public static String ensureValidLabelSelector(String labelSelector) {
- // might want to implement validation here?
- return labelSelector;
- }
-
public static boolean allNamespacesWatched(Set namespaces) {
failIfNotValid(namespaces);
return DEFAULT_NAMESPACES_SET.equals(namespaces);
@@ -251,6 +250,20 @@ public String getLabelSelector() {
return labelSelector;
}
+ /**
+ * Retrieves the shard selector that is used, in addition to the {@link #getLabelSelector() label
+ * selector}, to restrict which resources are actually watched by the associated informer.
+ * Typically used to assign a subset (shard) of the resources to a given operator instance. It is
+ * expressed using the same syntax as a label selector. See the official documentation on the topic for
+ * more details on syntax.
+ *
+ * @return the shard selector filtering watched resources
+ */
+ public String getShardSelector() {
+ return shardSelector;
+ }
+
public OnAddFilter super R> getOnAddFilter() {
return onAddFilter;
}
@@ -353,6 +366,11 @@ public InformerConfiguration.Builder initFromAnnotation(
var labelSelector = Constants.NO_VALUE_SET.equals(fromAnnotation) ? null : fromAnnotation;
withLabelSelector(labelSelector);
+ final var shardFromAnnotation = informerConfig.shardSelector();
+ var shardSelector =
+ Constants.NO_VALUE_SET.equals(shardFromAnnotation) ? null : shardFromAnnotation;
+ withShardSelector(shardSelector);
+
withOnAddFilter(
Utils.instantiate(informerConfig.onAddFilter(), OnAddFilter.class, context));
@@ -442,7 +460,12 @@ public Builder withFollowControllerNamespacesChanges(boolean followChanges) {
}
public Builder withLabelSelector(String labelSelector) {
- InformerConfiguration.this.labelSelector = ensureValidLabelSelector(labelSelector);
+ InformerConfiguration.this.labelSelector = labelSelector;
+ return this;
+ }
+
+ public Builder withShardSelector(String shardSelector) {
+ InformerConfiguration.this.shardSelector = shardSelector;
return this;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java
index 1a1d8956fc..ab1ad2b8eb 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java
@@ -251,6 +251,11 @@ public Builder withLabelSelector(String labelSelector) {
return this;
}
+ public Builder withShardSelector(String shardSelector) {
+ config.withShardSelector(shardSelector);
+ return this;
+ }
+
public Builder withOnAddFilter(OnAddFilter super R> onAddFilter) {
config.withOnAddFilter(onAddFilter);
return this;
@@ -308,6 +313,7 @@ public void updateFrom(InformerConfiguration informerConfig) {
.withFollowControllerNamespacesChanges(
informerConfig.getFollowControllerNamespaceChanges())
.withLabelSelector(informerConfig.getLabelSelector())
+ .withShardSelector(informerConfig.getShardSelector())
.withItemStore(informerConfig.getItemStore())
.withOnAddFilter(informerConfig.getOnAddFilter())
.withOnUpdateFilter(informerConfig.getOnUpdateFilter())
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
index 2df74d4298..75d12eb1ad 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
@@ -114,6 +114,88 @@ default Stream getSecondaryResourcesAsStream(Class expectedType) {
Optional getSecondaryResource(Class expectedType, String eventSourceName);
+ /**
+ * Retrieves a specific secondary resource by name and namespace from the event source identified
+ * by the given name.
+ *
+ * This is a typed convenience over manually retrieving the {@link
+ * io.javaoperatorsdk.operator.processing.event.source.EventSource} and calling its cache. When
+ * the underlying event source implements {@link
+ * io.javaoperatorsdk.operator.processing.event.source.Cache}, the lookup is a direct cache lookup
+ * and read-cache-after-write consistent.
+ *
+ *
{@code eventSourceName} may be {@code null}. When {@code null} and {@code expectedType} is
+ * part of a managed workflow whose activation condition may not have registered the event source,
+ * an empty {@link Optional} is returned instead of throwing {@link
+ * io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException}.
+ *
+ * @param expectedType the class representing the type of secondary resource to retrieve
+ * @param eventSourceName the name of the event source to look in (may be {@code null})
+ * @param name the name of the secondary resource
+ * @param namespace the namespace of the secondary resource (may be {@code null} for
+ * cluster-scoped resources)
+ * @param the type of secondary resource to retrieve
+ * @return an {@link Optional} containing the matching secondary resource, or {@link
+ * Optional#empty()} if none matches
+ * @throws io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException if no event
+ * source is registered for the given type and name (and no workflow activation condition
+ * accounts for it)
+ * @since 5.4.0
+ */
+ Optional getSecondaryResource(
+ Class expectedType, String eventSourceName, String name, String namespace);
+
+ /**
+ * Convenience overload of {@link #getSecondaryResource(Class, String, String, String)} that uses
+ * the primary resource's namespace.
+ *
+ * If the primary resource is cluster-scoped (no namespace), the lookup is performed against
+ * the cluster scope. To target a specific namespace from a cluster-scoped primary, use {@link
+ * #getSecondaryResource(Class, String, String, String)} directly.
+ *
+ *
{@code eventSourceName} may be {@code null} with the same semantics as in {@link
+ * #getSecondaryResource(Class, String, String, String)}.
+ *
+ * @param expectedType the class representing the type of secondary resource to retrieve
+ * @param eventSourceName the name of the event source to look in (may be {@code null})
+ * @param name the name of the secondary resource (namespace inferred from the primary)
+ * @param the type of secondary resource to retrieve
+ * @return an {@link Optional} containing the matching secondary resource, or {@link
+ * Optional#empty()} if none matches
+ * @since 5.4.0
+ */
+ default Optional getSecondaryResource(
+ Class expectedType, String eventSourceName, String name) {
+ return getSecondaryResource(
+ expectedType, eventSourceName, name, getPrimaryResource().getMetadata().getNamespace());
+ }
+
+ /**
+ * Retrieves a {@link Stream} of the secondary resources of the specified type from the event
+ * source identified by the given name. Useful when several event sources are registered for the
+ * same type and you need to scope retrieval to one of them, or when you want to apply a custom
+ * filter at the call site.
+ *
+ * When the underlying event source implements {@link ResourceCache}, the stream is
+ * read-cache-after-write consistent.
+ *
+ *
{@code eventSourceName} may be {@code null} with the same semantics as in {@link
+ * #getSecondaryResource(Class, String, String, String)}: when {@code null} and {@code
+ * expectedType} is part of a managed workflow whose activation condition may not have registered
+ * the event source, an empty {@link Stream} is returned instead of throwing {@link
+ * io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException}.
+ *
+ * @param expectedType the class representing the type of secondary resources to retrieve
+ * @param eventSourceName the name of the event source to look in (may be {@code null})
+ * @param the type of secondary resources to retrieve
+ * @return a {@link Stream} of secondary resources of the specified type
+ * @throws io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException if no event
+ * source is registered for the given type and name (and no workflow activation condition
+ * accounts for it)
+ * @since 5.4.0
+ */
+ Stream getSecondaryResourcesAsStream(Class expectedType, String eventSourceName);
+
ControllerConfiguration getControllerConfiguration();
/**
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java
index d305c28824..70ae7435d1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java
@@ -105,4 +105,15 @@ MaxReconciliationInterval maxReconciliationInterval() default
* documentation for further details.
*/
boolean triggerReconcilerOnAllEvents() default false;
+
+ /**
+ * When set to {@code false}, JOSDK will not apply its default internal update filters
+ * (generation- aware, finalizer-needed, marked-for-deletion) to the controller's event source.
+ * The user's {@link Informer#onUpdateFilter()} becomes the sole filter and has full control. To
+ * keep any of the default behavior, compose it explicitly using the static methods on {@link
+ * io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters}.
+ *
+ * @return whether JOSDK's internal update filters are applied
+ */
+ boolean defaultFilters() default true;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
index ac5a7b41b9..2d9a22b6fa 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
@@ -36,6 +36,7 @@
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.Cache;
public class DefaultContext
implements Context
{
private RetryInfo retryInfo;
@@ -95,6 +96,20 @@ public Stream getSecondaryResourcesAsStream(Class expectedType, boolea
}
}
+ /**
+ * Whether a missing event source for the given type is the expected case, in which case callers
+ * should return an empty result instead of propagating the {@link
+ * NoEventSourceForClassException}.
+ *
+ * If a workflow has an activation condition there can be event sources which are only
+ * registered if the activation condition holds, but to provide a consistent API we return an
+ * empty result instead of throwing an exception. Note that not only the resource which has an
+ * activation condition might not be registered but dependents which depend on it.
+ */
+ private boolean isMissingEventSourceExpected(String eventSourceName, Class> expectedType) {
+ return eventSourceName == null && controller.workflowContainsDependentForType(expectedType);
+ }
+
private Map deduplicatedMap(Stream stream) {
return stream.collect(
Collectors.toUnmodifiableMap(
@@ -120,19 +135,51 @@ public Optional getSecondaryResource(Class expectedType, String eventS
.getEventSourceFor(expectedType, eventSourceName)
.getSecondaryResource(primaryResource);
} catch (NoEventSourceForClassException e) {
- /*
- * If a workflow has an activation condition there can be event sources which are only
- * registered if the activation condition holds, but to provide a consistent API we return an
- * Optional instead of throwing an exception.
- *
- * Note that not only the resource which has an activation condition might not be registered
- * but dependents which depend on it.
- */
- if (eventSourceName == null && controller.workflowContainsDependentForType(expectedType)) {
+ if (isMissingEventSourceExpected(eventSourceName, expectedType)) {
return Optional.empty();
- } else {
- throw e;
}
+ throw e;
+ }
+ }
+
+ @Override
+ public Optional getSecondaryResource(
+ Class expectedType, String eventSourceName, String name, String namespace) {
+ try {
+ final var eventSource =
+ controller.getEventSourceManager().getEventSourceFor(expectedType, eventSourceName);
+ final var resourceID = new ResourceID(name, namespace);
+ if (eventSource instanceof Cache> cache) {
+ return cache.get(resourceID).map(expectedType::cast);
+ }
+ return eventSource.getSecondaryResources(primaryResource).stream()
+ .filter(r -> ResourceID.fromResource(r).equals(resourceID))
+ .findFirst();
+ } catch (NoEventSourceForClassException e) {
+ if (isMissingEventSourceExpected(eventSourceName, expectedType)) {
+ return Optional.empty();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Stream getSecondaryResourcesAsStream(
+ Class expectedType, String eventSourceName) {
+ try {
+ final var eventSource =
+ controller.getEventSourceManager().getEventSourceFor(expectedType, eventSourceName);
+ if (eventSource instanceof ResourceCache> resourceCache) {
+ final var ns = primaryResource.getMetadata().getNamespace();
+ final Stream> stream = ns == null ? resourceCache.list() : resourceCache.list(ns);
+ return stream.map(expectedType::cast);
+ }
+ return eventSource.getSecondaryResources(primaryResource).stream();
+ } catch (NoEventSourceForClassException e) {
+ if (isMissingEventSourceExpected(eventSourceName, expectedType)) {
+ return Stream.empty();
+ }
+ throw e;
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
index 5af48a1694..c8322e47e5 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
@@ -49,6 +49,13 @@ public class EventProcessor implements EventHandler, Life
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;
+ /**
+ * Threshold below which an event-driven failed reconciliation that lands inside the current retry
+ * window is allowed to consume a retry attempt (i.e. advance the retry counter). Above this
+ * threshold the existing retry deadline is preserved instead.
+ */
+ private static final long RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS = 5_000;
+
private volatile boolean running;
private final ControllerConfiguration> controllerConfiguration;
private final ReconciliationDispatcher
reconciliationDispatcher;
@@ -377,6 +384,15 @@ private void handleRetryOnException(ExecutionScope
executionScope, Exception
submitReconciliationExecution(state);
return;
}
+ Optional remaining = state.getRetry().remainingDurationUntilNextRetry();
+ if (remaining.isPresent()
+ && remaining.get().toMillis() > RETRY_DEADLINE_PRESERVE_THRESHOLD_MILLIS) {
+ log.debug(
+ "Preserving existing retry deadline; remaining: {} ms. Not consuming a retry attempt.",
+ remaining.get().toMillis());
+ retryEventSource().scheduleOnce(resourceID, remaining.get().toMillis());
+ return;
+ }
Optional nextDelay = state.getRetry().nextDelay();
nextDelay.ifPresentOrElse(
delay -> {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java
index dfa94577f7..1f5f638144 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java
@@ -52,18 +52,24 @@ public ControllerEventSource(Controller controller) {
this.controller = controller;
final var config = controller.getConfiguration();
- OnUpdateFilter internalOnUpdateFilter =
- onUpdateFinalizerNeededAndApplied(controller.useFinalizer(), config.getFinalizerName())
- .or(onUpdateGenerationAware(config.isGenerationAware()))
- .or(onUpdateMarkedForDeletion());
// by default the on add should be processed in all cases regarding internal filters
final var informerConfig = config.getInformerConfig();
Optional.ofNullable(informerConfig.getOnAddFilter()).ifPresent(this::setOnAddFilter);
- Optional.ofNullable(informerConfig.getOnUpdateFilter())
- .ifPresentOrElse(
- filter -> setOnUpdateFilter(filter.and(internalOnUpdateFilter)),
- () -> setOnUpdateFilter(internalOnUpdateFilter));
+
+ if (config.isDefaultFilters()) {
+ OnUpdateFilter internalOnUpdateFilter =
+ defaultFilters(
+ controller.useFinalizer(), config.getFinalizerName(), config.isGenerationAware());
+ Optional.ofNullable(informerConfig.getOnUpdateFilter())
+ .ifPresentOrElse(
+ filter -> setOnUpdateFilter(filter.and(internalOnUpdateFilter)),
+ () -> setOnUpdateFilter(internalOnUpdateFilter));
+ } else {
+ var userFilter = informerConfig.getOnUpdateFilter();
+ setOnUpdateFilter(userFilter != null ? userFilter : (newResource, oldResource) -> true);
+ }
+
Optional.ofNullable(informerConfig.getGenericFilter()).ifPresent(this::setGenericFilter);
setControllerConfiguration(config);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java
index 747f9f860c..20bea0106a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java
@@ -22,7 +22,7 @@ public class InternalEventFilters {
private InternalEventFilters() {}
- static OnUpdateFilter onUpdateMarkedForDeletion() {
+ public static OnUpdateFilter onUpdateMarkedForDeletion() {
// the old resource is checked since in corner cases users might still want to update the status
// for a resource that is marked for deletion
@@ -30,7 +30,7 @@ static OnUpdateFilter onUpdateMarkedForDeletion() {
!oldResource.isMarkedForDeletion() && newResource.isMarkedForDeletion();
}
- static OnUpdateFilter onUpdateGenerationAware(
+ public static OnUpdateFilter onUpdateGenerationAware(
boolean generationAware) {
return (newResource, oldResource) -> {
@@ -46,7 +46,7 @@ static OnUpdateFilter onUpdateGenerationAware(
};
}
- static OnUpdateFilter onUpdateFinalizerNeededAndApplied(
+ public static OnUpdateFilter onUpdateFinalizerNeededAndApplied(
boolean useFinalizer, String finalizerName) {
return (newResource, oldResource) -> {
if (useFinalizer) {
@@ -61,4 +61,11 @@ static OnUpdateFilter onUpdateFinalizerNeededAndAppli
}
};
}
+
+ public static OnUpdateFilter defaultFilters(
+ boolean useFinalizer, String finalizerName, boolean generationAware) {
+ return InternalEventFilters.onUpdateFinalizerNeededAndApplied(useFinalizer, finalizerName)
+ .or(onUpdateGenerationAware(generationAware))
+ .or(onUpdateMarkedForDeletion());
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index bfbe17c7c8..8e7054b231 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -136,13 +136,18 @@ public void changeNamespaces(Set namespaces) {
private InformerWrapper createEventSourceForNamespace(String namespace) {
final InformerWrapper source;
final var labelSelector = configuration.getInformerConfig().getLabelSelector();
+ final var shardSelector = configuration.getInformerConfig().getShardSelector();
if (namespace.equals(WATCH_ALL_NAMESPACES)) {
- final var filteredBySelectorClient = client.inAnyNamespace().withLabelSelector(labelSelector);
+ final var filteredBySelectorClient =
+ client.inAnyNamespace().withLabelSelector(labelSelector).withShardSelector(shardSelector);
source = createEventSource(filteredBySelectorClient, eventHandler, WATCH_ALL_NAMESPACES);
} else {
source =
createEventSource(
- client.inNamespace(namespace).withLabelSelector(labelSelector),
+ client
+ .inNamespace(namespace)
+ .withLabelSelector(labelSelector)
+ .withShardSelector(shardSelector),
eventHandler,
namespace);
}
@@ -275,12 +280,14 @@ public List byIndex(String indexName, String indexKey) {
@Override
public String toString() {
final var informerConfig = configuration.getInformerConfig();
- final var selector = informerConfig.getLabelSelector();
+ final var labelSelector = informerConfig.getLabelSelector();
+ final var shardSelector = informerConfig.getShardSelector();
return "InformerManager ["
+ ReconcilerUtilsInternal.getResourceTypeNameWithVersion(configuration.getResourceClass())
+ "] watching: "
+ informerConfig.getEffectiveNamespaces(controllerConfiguration)
- + (selector != null ? " selector: " + selector : "");
+ + (labelSelector != null ? " label selector: " + labelSelector : "")
+ + (shardSelector != null ? " shard selector: " + shardSelector : "");
}
public Map informerHealthIndicators() {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index a9c6818565..bb28e153d1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -155,16 +155,14 @@ public synchronized void stop() {
@Override
public void onList(String resourceVersion, boolean remainedEmpty) {
- // re-list supported by fabric8 client https://github.com/fabric8io/kubernetes-client/pull/7899
- // temporaryResourceCache.setRelistFinished(resourceVersion);
+ temporaryResourceCache.setRelistFinished();
temporaryResourceCache.checkGhostResources();
}
- // @Override (enable when
- // re-list supported by fabric8 client https://github.com/fabric8io/kubernetes-client/pull/7899
- // public void onBeforeList(String lastSyncResourceVersion) {
- // temporaryResourceCache.setOngoingRelist(lastSyncResourceVersion);
- // }
+ @Override
+ public void onBeforeList(String lastSyncResourceVersion) {
+ temporaryResourceCache.setOngoingRelist();
+ }
@Override
public void handleRecentResourceUpdate(
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
index 8879493a2a..e7d6f6f55d 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
@@ -265,11 +265,11 @@ EventFilterSupport getEventFilterSupport() {
return eventFilteringSupport;
}
- public void setOngoingRelist(String lastKnownSyncVersion) {
+ public void setOngoingRelist() {
eventFilteringSupport.setStartingReList();
}
- public void setRelistFinished(String syncResourceVersions) {
+ public void setRelistFinished() {
eventFilteringSupport.setRelistFinished();
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java
index 4bdce57a77..fadc022de7 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecution.java
@@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.retry;
+import java.time.Duration;
import java.util.Optional;
public class GenericRetryExecution implements RetryExecution {
@@ -23,6 +24,7 @@ public class GenericRetryExecution implements RetryExecution {
private int lastAttemptIndex = 0;
private long currentInterval;
+ private Long lastNextDelayCallEpochMillis;
public GenericRetryExecution(GenericRetry genericRetry) {
this.genericRetry = genericRetry;
@@ -40,6 +42,7 @@ public Optional nextDelay() {
}
}
lastAttemptIndex++;
+ lastNextDelayCallEpochMillis = System.currentTimeMillis();
return Optional.of(currentInterval);
}
@@ -52,4 +55,16 @@ public boolean isLastAttempt() {
public int getAttemptCount() {
return lastAttemptIndex;
}
+
+ @Override
+ public Optional remainingDurationUntilNextRetry() {
+ if (lastNextDelayCallEpochMillis == null) {
+ return Optional.empty();
+ }
+ long remaining = (lastNextDelayCallEpochMillis + currentInterval) - System.currentTimeMillis();
+ if (remaining <= 0) {
+ return Optional.empty();
+ }
+ return Optional.of(Duration.ofMillis(remaining));
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java
index caf71d7a33..a644a274ba 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/retry/RetryExecution.java
@@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.retry;
+import java.time.Duration;
import java.util.Optional;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -25,4 +26,15 @@ public interface RetryExecution extends RetryInfo {
* @return the time to wait until the next execution in milliseconds
*/
Optional nextDelay();
+
+ /**
+ * Remaining time of the currently scheduled retry interval, i.e. the time until the previously
+ * computed retry delay would elapse. Returns an empty {@link Optional} if no retry has been
+ * scheduled yet (i.e. {@link #nextDelay()} has never been called) or if the deadline has already
+ * passed.
+ *
+ * Used to decide whether an event-driven failed reconciliation that lands well inside the
+ * retry window should consume a retry attempt or simply be re-scheduled on the original deadline.
+ */
+ Optional remainingDurationUntilNextRetry();
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java
index 0000429c20..61b434c0c4 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java
@@ -84,6 +84,7 @@ public static KubernetesClient client(
when(nonNamespaceOperation.withLabelSelector(nullable(String.class))).thenReturn(filterable);
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
+ when(filterable.withShardSelector(nullable(String.class))).thenReturn(filterable);
SharedIndexInformer informer = mock(SharedIndexInformer.class);
CompletableFuture informerStartRes = new CompletableFuture<>();
informerStartRes.complete(null);
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverriderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverriderTest.java
index 06ea65803d..86b8de441b 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverriderTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverriderTest.java
@@ -181,6 +181,20 @@ void itemStorePreserved() {
assertNotNull(configuration.getInformerConfig().getItemStore());
}
+ @Test
+ void shardSelectorShouldBePropagated() {
+ var configuration = createConfiguration(new WatchCurrentReconciler());
+ assertNull(configuration.getInformerConfig().getShardSelector());
+
+ final var shardSelector = "shard=1";
+ configuration =
+ ControllerConfigurationOverrider.override(configuration)
+ .withShardSelector(shardSelector)
+ .build();
+
+ assertEquals(shardSelector, configuration.getInformerConfig().getShardSelector());
+ }
+
@Test
void configuredDependentShouldNotChangeOnParentOverrideEvenWhenInitialConfigIsSame() {
var configuration = createConfiguration(new OverriddenNSOnDepReconciler());
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/InformerConfigurationTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/InformerConfigurationTest.java
index 2631a1af82..95b8465706 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/InformerConfigurationTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/InformerConfigurationTest.java
@@ -73,6 +73,19 @@ void nullLabelSelectorByDefault() {
assertNull(informerConfig.getLabelSelector());
}
+ @Test
+ void nullShardSelectorByDefault() {
+ final var informerConfig = InformerConfiguration.builder(ConfigMap.class).build();
+ assertNull(informerConfig.getShardSelector());
+ }
+
+ @Test
+ void shardSelectorIsSetOnBuilder() {
+ final var informerConfig =
+ InformerConfiguration.builder(ConfigMap.class).withShardSelector("shard=1").build();
+ assertEquals("shard=1", informerConfig.getShardSelector());
+ }
+
@Test
void shouldWatchAllNamespacesByDefaultForControllers() {
final var informerConfig = InformerConfiguration.builder(ConfigMap.class).buildForController();
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java
index 4df8df385b..7b9658f98d 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContextTest.java
@@ -16,26 +16,34 @@
package io.javaoperatorsdk.operator.api.reconciler;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Secret;
+import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class DefaultContextTest {
@@ -63,6 +71,234 @@ void getSecondaryResourceReturnsEmptyOptionalOnNonActivatedDRType() {
assertThat(res).isEmpty();
}
+ @Test
+ void getSecondaryResourceByNameAndNamespaceReturnsFromCacheFastPath() {
+ final var cm =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-foo")
+ .withNamespace("ns")
+ .endMetadata()
+ .build();
+
+ final ManagedInformerEventSource cachingEventSource = mock();
+ when(cachingEventSource.get(new ResourceID("cm-foo", "ns"))).thenReturn(Optional.of(cm));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name")).thenReturn(cachingEventSource);
+
+ final var res = context.getSecondaryResource(ConfigMap.class, "es-name", "cm-foo", "ns");
+
+ assertThat(res).contains(cm);
+ verify(cachingEventSource).get(new ResourceID("cm-foo", "ns"));
+ }
+
+ @Test
+ void getSecondaryResourceByNameAndNamespaceReturnsEmptyOnCacheMiss() {
+ final ManagedInformerEventSource cachingEventSource = mock();
+ when(cachingEventSource.get(new ResourceID("missing", "ns"))).thenReturn(Optional.empty());
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name")).thenReturn(cachingEventSource);
+
+ assertThat(context.getSecondaryResource(ConfigMap.class, "es-name", "missing", "ns")).isEmpty();
+ }
+
+ @Test
+ void getSecondaryResourceByNameAndNamespaceFallsBackToGetSecondaryResources() {
+ final var match =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-foo")
+ .withNamespace("ns")
+ .endMetadata()
+ .build();
+ final var other =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-bar")
+ .withNamespace("ns")
+ .endMetadata()
+ .build();
+
+ final EventSource nonCachingEventSource = mock();
+ when(nonCachingEventSource.getSecondaryResources(any())).thenReturn(Set.of(match, other));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name"))
+ .thenReturn(nonCachingEventSource);
+
+ final var res = context.getSecondaryResource(ConfigMap.class, "es-name", "cm-foo", "ns");
+
+ assertThat(res).contains(match);
+ }
+
+ @Test
+ void getSecondaryResourceByNameAndNamespaceFallbackReturnsEmptyWhenNoMatch() {
+ final var other =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-other")
+ .withNamespace("ns")
+ .endMetadata()
+ .build();
+
+ final EventSource nonCachingEventSource = mock();
+ when(nonCachingEventSource.getSecondaryResources(any())).thenReturn(Set.of(other));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name"))
+ .thenReturn(nonCachingEventSource);
+
+ assertThat(context.getSecondaryResource(ConfigMap.class, "es-name", "missing", "ns")).isEmpty();
+ }
+
+ @Test
+ void getSecondaryResourceByNameAndNamespaceRethrowsWhenNoEventSourceAndNotWorkflowManaged() {
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name"))
+ .thenThrow(new NoEventSourceForClassException(ConfigMap.class));
+
+ assertThatThrownBy(
+ () -> context.getSecondaryResource(ConfigMap.class, "es-name", "cm-foo", "ns"))
+ .isInstanceOf(NoEventSourceForClassException.class);
+ }
+
+ @Test
+ void getSecondaryResourceByNameAndNamespaceReturnsEmptyWhenNoEventSourceButWorkflowManaged() {
+ when(mockManager.getEventSourceFor(ConfigMap.class, null))
+ .thenThrow(new NoEventSourceForClassException(ConfigMap.class));
+ when(mockController.workflowContainsDependentForType(ConfigMap.class)).thenReturn(true);
+
+ final var res = context.getSecondaryResource(ConfigMap.class, null, "cm-foo", "ns");
+
+ assertThat(res).isEmpty();
+ }
+
+ @Test
+ void getSecondaryResourceByNameUsesPrimaryNamespace() {
+ final var primaryNamespace = "primary-ns";
+ final var namespacedPrimary =
+ new SecretBuilder()
+ .withNewMetadata()
+ .withName("primary")
+ .withNamespace(primaryNamespace)
+ .endMetadata()
+ .build();
+ final DefaultContext namespacedContext =
+ new DefaultContext<>(null, mockController, namespacedPrimary, false, false);
+
+ final var cm =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-foo")
+ .withNamespace(primaryNamespace)
+ .endMetadata()
+ .build();
+
+ final ManagedInformerEventSource cachingEventSource = mock();
+ when(cachingEventSource.get(new ResourceID("cm-foo", primaryNamespace)))
+ .thenReturn(Optional.of(cm));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name")).thenReturn(cachingEventSource);
+
+ final var res = namespacedContext.getSecondaryResource(ConfigMap.class, "es-name", "cm-foo");
+
+ assertThat(res).contains(cm);
+ }
+
+ @Test
+ void getSecondaryResourcesAsStreamByEventSourceUsesResourceCacheFastPath() {
+ final var primaryNamespace = "primary-ns";
+ final var namespacedPrimary =
+ new SecretBuilder()
+ .withNewMetadata()
+ .withName("primary")
+ .withNamespace(primaryNamespace)
+ .endMetadata()
+ .build();
+ final DefaultContext namespacedContext =
+ new DefaultContext<>(null, mockController, namespacedPrimary, false, false);
+
+ final var cm1 =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-1")
+ .withNamespace(primaryNamespace)
+ .endMetadata()
+ .build();
+ final var cm2 =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-2")
+ .withNamespace(primaryNamespace)
+ .endMetadata()
+ .build();
+
+ final ManagedInformerEventSource resourceCacheEventSource = mock();
+ when(resourceCacheEventSource.list(primaryNamespace)).thenReturn(Stream.of(cm1, cm2));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name"))
+ .thenReturn(resourceCacheEventSource);
+
+ final var res =
+ namespacedContext.getSecondaryResourcesAsStream(ConfigMap.class, "es-name").toList();
+
+ assertThat(res).containsExactlyInAnyOrder(cm1, cm2);
+ verify(resourceCacheEventSource).list(primaryNamespace);
+ }
+
+ @Test
+ void getSecondaryResourcesAsStreamByEventSourceFastPathOnClusterScopedPrimary() {
+ // cluster-scoped primary: has metadata but no namespace set.
+ final var clusterScopedPrimary =
+ new SecretBuilder().withNewMetadata().withName("primary").endMetadata().build();
+ final DefaultContext clusterScopedContext =
+ new DefaultContext<>(null, mockController, clusterScopedPrimary, false, false);
+
+ final var cm1 = new ConfigMapBuilder().withNewMetadata().withName("cm-1").endMetadata().build();
+
+ final ManagedInformerEventSource resourceCacheEventSource = mock();
+ when(resourceCacheEventSource.list()).thenReturn(Stream.of(cm1));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name"))
+ .thenReturn(resourceCacheEventSource);
+
+ final var res =
+ clusterScopedContext.getSecondaryResourcesAsStream(ConfigMap.class, "es-name").toList();
+
+ assertThat(res).containsExactly(cm1);
+ verify(resourceCacheEventSource).list();
+ verify(resourceCacheEventSource, never()).list(any(String.class));
+ }
+
+ @Test
+ void getSecondaryResourcesAsStreamByEventSourceFallsBackToGetSecondaryResources() {
+ final var cm1 =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("cm-1")
+ .withNamespace("ns")
+ .endMetadata()
+ .build();
+
+ final EventSource nonCacheEventSource = mock();
+ when(nonCacheEventSource.getSecondaryResources(any())).thenReturn(Set.of(cm1));
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name")).thenReturn(nonCacheEventSource);
+
+ final var res = context.getSecondaryResourcesAsStream(ConfigMap.class, "es-name").toList();
+
+ assertThat(res).containsExactly(cm1);
+ }
+
+ @Test
+ void getSecondaryResourcesAsStreamByEventSourceRethrowsWhenNotWorkflowManaged() {
+ when(mockManager.getEventSourceFor(ConfigMap.class, "es-name"))
+ .thenThrow(new NoEventSourceForClassException(ConfigMap.class));
+
+ assertThatThrownBy(() -> context.getSecondaryResourcesAsStream(ConfigMap.class, "es-name"))
+ .isInstanceOf(NoEventSourceForClassException.class);
+ }
+
+ @Test
+ void getSecondaryResourcesAsStreamByEventSourceReturnsEmptyWhenWorkflowManaged() {
+ when(mockManager.getEventSourceFor(ConfigMap.class, null))
+ .thenThrow(new NoEventSourceForClassException(ConfigMap.class));
+ when(mockController.workflowContainsDependentForType(ConfigMap.class)).thenReturn(true);
+
+ final var res = context.getSecondaryResourcesAsStream(ConfigMap.class, null).toList();
+
+ assertThat(res).isEmpty();
+ }
+
@Test
void setRetryInfo() {
RetryInfo retryInfo = mock();
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
index fb8f7c0805..f7864f2f16 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
@@ -465,6 +465,98 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}
+ @Test
+ void preservesRetryDeadlineWhenRemainingDurationAboveThreshold() {
+ RetryExecution mockRetryExecution = mock(RetryExecution.class);
+ when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
+ when(mockRetryExecution.remainingDurationUntilNextRetry())
+ .thenReturn(Optional.of(Duration.ofMillis(50_000)));
+ Retry retry = mock(Retry.class);
+ when(retry.initExecution()).thenReturn(mockRetryExecution);
+ eventProcessorWithRetry =
+ spy(
+ new EventProcessor(
+ controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
+ reconciliationDispatcherMock,
+ eventSourceManagerMock,
+ metricsMock));
+ eventProcessorWithRetry.start();
+ when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
+
+ TestCustomResource customResource = testCustomResource();
+ ExecutionScope executionScope =
+ new ExecutionScope(null, null, false, false).setResource(customResource);
+ PostExecutionControl postExecutionControl =
+ PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
+
+ eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
+
+ verify(mockRetryExecution, never()).nextDelay();
+ verify(retryTimerEventSourceMock, times(1))
+ .scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(50_000L));
+ }
+
+ @Test
+ void consumesRetryAttemptWhenRemainingDurationAtOrBelowThreshold() {
+ RetryExecution mockRetryExecution = mock(RetryExecution.class);
+ when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
+ when(mockRetryExecution.remainingDurationUntilNextRetry())
+ .thenReturn(Optional.of(Duration.ofMillis(2_000)));
+ Retry retry = mock(Retry.class);
+ when(retry.initExecution()).thenReturn(mockRetryExecution);
+ eventProcessorWithRetry =
+ spy(
+ new EventProcessor(
+ controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
+ reconciliationDispatcherMock,
+ eventSourceManagerMock,
+ metricsMock));
+ eventProcessorWithRetry.start();
+ when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
+
+ TestCustomResource customResource = testCustomResource();
+ ExecutionScope executionScope =
+ new ExecutionScope(null, null, false, false).setResource(customResource);
+ PostExecutionControl postExecutionControl =
+ PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
+
+ eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
+
+ verify(mockRetryExecution, times(1)).nextDelay();
+ verify(retryTimerEventSourceMock, times(1))
+ .scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L));
+ }
+
+ @Test
+ void firstFailureSchedulesUsingNextDelayWhenNoRemainingDuration() {
+ RetryExecution mockRetryExecution = mock(RetryExecution.class);
+ when(mockRetryExecution.nextDelay()).thenReturn(Optional.of(60_000L));
+ when(mockRetryExecution.remainingDurationUntilNextRetry()).thenReturn(Optional.empty());
+ Retry retry = mock(Retry.class);
+ when(retry.initExecution()).thenReturn(mockRetryExecution);
+ eventProcessorWithRetry =
+ spy(
+ new EventProcessor(
+ controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()),
+ reconciliationDispatcherMock,
+ eventSourceManagerMock,
+ metricsMock));
+ eventProcessorWithRetry.start();
+ when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
+
+ TestCustomResource customResource = testCustomResource();
+ ExecutionScope executionScope =
+ new ExecutionScope(null, null, false, false).setResource(customResource);
+ PostExecutionControl postExecutionControl =
+ PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
+
+ eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
+
+ verify(mockRetryExecution, times(1)).nextDelay();
+ verify(retryTimerEventSourceMock, times(1))
+ .scheduleOnce(eq(ResourceID.fromResource(customResource)), eq(60_000L));
+ }
+
@Test
void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException {
when(reconciliationDispatcherMock.handleExecution(any()))
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java
index f8cb54f68e..71ad7314fe 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java
@@ -140,13 +140,44 @@ void callsBroadcastsOnResourceEvents() {
eq(ResourceAction.UPDATED), eq(customResource1), eq(customResource1));
}
+ @Test
+ void withoutDefaultFiltersUserFilterIsAppliedDirectly() {
+ TestCustomResource cr = TestUtils.testCustomResource();
+ cr.getMetadata().setFinalizers(List.of(FINALIZER));
+ cr.getMetadata().setGeneration(1L);
+
+ // Without default filters, only the user filter runs — no internal generation/finalizer checks.
+ // User filter accepts unconditionally, so the event passes even with same generation.
+ OnUpdateFilter userFilter = (newRes, oldRes) -> true;
+ source = new ControllerEventSource<>(new TestController(null, userFilter, null, false));
+ setUpSource(source, true, controllerConfig);
+
+ source.handleEvent(ResourceAction.UPDATED, cr, cr, null);
+
+ verify(eventHandler, times(1)).handleEvent(any());
+ }
+
+ @Test
+ void withoutDefaultFiltersUserFilterCanRejectEvents() {
+ TestCustomResource cr = TestUtils.testCustomResource();
+
+ OnUpdateFilter userFilter = (newRes, oldRes) -> false;
+ source = new ControllerEventSource<>(new TestController(null, userFilter, null, false));
+ setUpSource(source, true, controllerConfig);
+
+ source.handleEvent(ResourceAction.UPDATED, cr, cr, null);
+
+ verify(eventHandler, never()).handleEvent(any());
+ }
+
@Test
void filtersOutEventsOnAddAndUpdate() {
TestCustomResource cr = TestUtils.testCustomResource();
OnAddFilter onAddFilter = (res) -> false;
OnUpdateFilter onUpdatePredicate = (res, res2) -> false;
- source = new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null));
+ source =
+ new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null, true));
setUpSource(source, true, controllerConfig);
source.handleEvent(ResourceAction.ADDED, cr, null, null);
@@ -159,7 +190,7 @@ void filtersOutEventsOnAddAndUpdate() {
void genericFilterFiltersOutAddUpdateAndDeleteEvents() {
TestCustomResource cr = TestUtils.testCustomResource();
- source = new ControllerEventSource<>(new TestController(null, null, res -> false));
+ source = new ControllerEventSource<>(new TestController(null, null, res -> false, true));
setUpSource(source, true, controllerConfig);
source.handleEvent(ResourceAction.ADDED, cr, null, null);
@@ -174,7 +205,7 @@ void ownUpdateEchoIsFilteredOutByEventFilter() throws InterruptedException {
// End-to-end smoke for the event-filter wiring on the controller path: an event for our
// own write must not propagate. Detail-level filter scenarios are covered in
// EventingDetailTest / EventFilterSupportTest.
- source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
+ source = spy(new ControllerEventSource<>(new TestController(null, null, null, true)));
setUpSource(source, true, controllerConfig);
doReturn(Optional.empty()).when(source).get(any());
@@ -189,7 +220,7 @@ void ownUpdateEchoIsFilteredOutByEventFilter() throws InterruptedException {
@Test
void foreignUpdateDuringFilteringPropagatesAsUpdate() {
// An external event during the filter window must surface (not be filtered as own).
- source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
+ source = spy(new ControllerEventSource<>(new TestController(null, null, null, true)));
setUpSource(source, true, controllerConfig);
var latch = sendForEventFilteringUpdate(2);
@@ -203,7 +234,7 @@ void foreignUpdateDuringFilteringPropagatesAsUpdate() {
void deleteEventDuringFilteringPropagatesAsDelete() {
// A DELETE arriving during the filter window must surface — the resource has gone,
// so the filter must not silence it just because our own write is still tracking RVs.
- source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
+ source = spy(new ControllerEventSource<>(new TestController(null, null, null, true)));
setUpSource(source, true, controllerConfig);
var latch = sendForEventFilteringUpdate(2);
@@ -223,7 +254,7 @@ void deleteEventDuringFilteringPropagatesAsDelete() {
void multipleForeignEventsDuringFilteringMergeIntoSingleEvent() {
// Several external events during one filter window collapse into a single
// synthesized event spanning prev → latest seen.
- source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
+ source = spy(new ControllerEventSource<>(new TestController(null, null, null, true)));
setUpSource(source, true, controllerConfig);
var latch = sendForEventFilteringUpdate(2);
@@ -266,17 +297,18 @@ private static class TestController extends Controller {
public TestController(
OnAddFilter onAddFilter,
OnUpdateFilter onUpdateFilter,
- GenericFilter genericFilter) {
+ GenericFilter genericFilter,
+ boolean defaultFilters) {
super(
reconciler,
- new TestConfiguration(true, onAddFilter, onUpdateFilter, genericFilter),
+ new TestConfiguration(true, onAddFilter, onUpdateFilter, genericFilter, defaultFilters),
MockKubernetesClient.client(TestCustomResource.class));
}
public TestController(boolean generationAware) {
super(
reconciler,
- new TestConfiguration(generationAware, null, null, null),
+ new TestConfiguration(generationAware, null, null, null, true),
MockKubernetesClient.client(TestCustomResource.class));
}
@@ -298,7 +330,8 @@ public TestConfiguration(
boolean generationAware,
OnAddFilter onAddFilter,
OnUpdateFilter onUpdateFilter,
- GenericFilter genericFilter) {
+ GenericFilter genericFilter,
+ boolean defaultFilters) {
super(
"test",
generationAware,
@@ -316,7 +349,8 @@ public TestConfiguration(
.withGenericFilter(genericFilter)
.withComparableResourceVersions(true)
.buildForController(),
- false);
+ false,
+ defaultFilters);
}
}
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterWindowTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterWindowTest.java
index 70f6d1621c..367c4fa4f3 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterWindowTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterWindowTest.java
@@ -15,7 +15,6 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -385,25 +384,6 @@ void additionalEventAndDeleteEvent() {
assertThat(eventFilterWindow.canBeRemoved()).isTrue();
}
- @Test
- @Disabled("should be part of event filter support")
- void additionalEventAndDeleteEventNoUpdate() {
- eventFilterWindow.increaseActiveUpdates();
- eventFilterWindow.addToOwnUpdateVersions(s(FIRST_OWN_VERSION));
- eventFilterWindow.addRelatedEvent(updateEvent(FIRST_OWN_VERSION));
- eventFilterWindow.addRelatedEvent(updateEvent(FIRST_OWN_VERSION + 1));
- eventFilterWindow.addRelatedEvent(deleteEvent(FIRST_OWN_VERSION + 2));
-
- assertThat(eventFilterWindow.check())
- .hasValueSatisfying(e -> assertDeleteEvent(e, FIRST_OWN_VERSION + 2));
- assertThat(eventFilterWindow.check()).isEmpty();
-
- assertEmptyState();
- eventFilterWindow.decreaseActiveUpdates();
-
- assertThat(eventFilterWindow.canBeRemoved()).isTrue();
- }
-
@Test
void deleteEventInMiddleTwoUpdates() {
eventFilterWindow.increaseActiveUpdates();
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java
index 8f5a446788..8d7ec55e37 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/retry/GenericRetryExecutionTest.java
@@ -21,10 +21,10 @@
import static org.assertj.core.api.Assertions.assertThat;
-public class GenericRetryExecutionTest {
+class GenericRetryExecutionTest {
@Test
- public void noNextDelayIfMaxAttemptLimitReached() {
+ void noNextDelayIfMaxAttemptLimitReached() {
RetryExecution retryExecution =
GenericRetry.defaultLimitedExponentialRetry().setMaxAttempts(3).initExecution();
Optional res = callNextDelayNTimes(retryExecution, 2);
@@ -35,7 +35,7 @@ public void noNextDelayIfMaxAttemptLimitReached() {
}
@Test
- public void canLimitMaxIntervalLength() {
+ void canLimitMaxIntervalLength() {
RetryExecution retryExecution =
GenericRetry.defaultLimitedExponentialRetry()
.setInitialInterval(2000)
@@ -49,13 +49,13 @@ public void canLimitMaxIntervalLength() {
}
@Test
- public void supportsNoRetry() {
+ void supportsNoRetry() {
RetryExecution retryExecution = GenericRetry.noRetry().initExecution();
assertThat(retryExecution.nextDelay()).isEmpty();
}
@Test
- public void supportsIsLastExecution() {
+ void supportsIsLastExecution() {
GenericRetryExecution execution = new GenericRetry().setMaxAttempts(2).initExecution();
assertThat(execution.isLastAttempt()).isFalse();
@@ -65,7 +65,7 @@ public void supportsIsLastExecution() {
}
@Test
- public void returnAttemptIndex() {
+ void returnAttemptIndex() {
RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution();
assertThat(retryExecution.getAttemptCount()).isEqualTo(0);
@@ -73,11 +73,59 @@ public void returnAttemptIndex() {
assertThat(retryExecution.getAttemptCount()).isEqualTo(1);
}
- private RetryExecution getDefaultRetryExecution() {
- return GenericRetry.defaultLimitedExponentialRetry().initExecution();
+ @Test
+ void remainingDurationEmptyBeforeFirstNextDelay() {
+ RetryExecution retryExecution = GenericRetry.defaultLimitedExponentialRetry().initExecution();
+
+ assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty();
+ }
+
+ @Test
+ void remainingDurationPresentAfterNextDelay() {
+ long interval = 60_000L;
+ RetryExecution retryExecution = new GenericRetry().setInitialInterval(interval).initExecution();
+
+ retryExecution.nextDelay();
+
+ Optional remaining = retryExecution.remainingDurationUntilNextRetry();
+ assertThat(remaining).isPresent();
+ assertThat(remaining.get().toMillis()).isPositive().isLessThanOrEqualTo(interval);
+ }
+
+ @Test
+ void remainingDurationEmptyAfterIntervalElapsed() throws InterruptedException {
+ RetryExecution retryExecution = new GenericRetry().setInitialInterval(20).initExecution();
+
+ retryExecution.nextDelay();
+ Thread.sleep(60);
+
+ assertThat(retryExecution.remainingDurationUntilNextRetry()).isEmpty();
+ }
+
+ @Test
+ void remainingDurationReflectsUpdatedIntervalAfterSubsequentNextDelay() {
+ long initialInterval = 1000L;
+ double multiplier = 2.0;
+ RetryExecution retryExecution =
+ new GenericRetry()
+ .setInitialInterval(initialInterval)
+ .setIntervalMultiplier(multiplier)
+ .initExecution();
+
+ // first two calls keep the initial interval (multiplier only kicks in after attempt 1)
+ retryExecution.nextDelay();
+ retryExecution.nextDelay();
+ // third call doubles the interval to 2000ms
+ retryExecution.nextDelay();
+
+ Optional remaining = retryExecution.remainingDurationUntilNextRetry();
+ assertThat(remaining).isPresent();
+ assertThat(remaining.get().toMillis())
+ .isPositive()
+ .isLessThanOrEqualTo((long) (initialInterval * multiplier));
}
- public Optional callNextDelayNTimes(RetryExecution retryExecution, int n) {
+ Optional callNextDelayNTimes(RetryExecution retryExecution, int n) {
for (int i = 0; i < n; i++) {
retryExecution.nextDelay();
}
diff --git a/operator-framework-junit/pom.xml b/operator-framework-junit/pom.xml
index 10923adf65..aa18d5c778 100644
--- a/operator-framework-junit/pom.xml
+++ b/operator-framework-junit/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
operator-framework-junit
diff --git a/operator-framework/pom.xml b/operator-framework/pom.xml
index 59abb1a926..f94dfa757d 100644
--- a/operator-framework/pom.xml
+++ b/operator-framework/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
operator-framework
diff --git a/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/loader/ConfigLoader.java b/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/loader/ConfigLoader.java
index d66b9139d4..a5b798190f 100644
--- a/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/loader/ConfigLoader.java
+++ b/operator-framework/src/main/java/io/javaoperatorsdk/operator/config/loader/ConfigLoader.java
@@ -143,6 +143,8 @@ public static ConfigLoader getDefault() {
ControllerConfigurationOverrider::withGenerationAware),
new ConfigBinding<>(
"label-selector", String.class, ControllerConfigurationOverrider::withLabelSelector),
+ new ConfigBinding<>(
+ "shard-selector", String.class, ControllerConfigurationOverrider::withShardSelector),
new ConfigBinding<>(
"max-reconciliation-interval",
Duration.class,
@@ -157,6 +159,10 @@ public static ConfigLoader getDefault() {
"informer.label-selector",
String.class,
ControllerConfigurationOverrider::withLabelSelector),
+ new ConfigBinding<>(
+ "informer.shard-selector",
+ String.class,
+ ControllerConfigurationOverrider::withShardSelector),
new ConfigBinding<>(
"informer.list-limit",
Long.class,
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersIT.java
new file mode 100644
index 0000000000..d305610f9b
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersIT.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.filter;
+
+import java.time.Duration;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+class WithoutDefaultFiltersIT {
+
+ public static final String RESOURCE_NAME = "without-default-filters-test1";
+ public static final int POLL_DELAY = 150;
+
+ @RegisterExtension
+ LocallyRunOperatorExtension operator =
+ LocallyRunOperatorExtension.builder()
+ .withReconciler(new WithoutDefaultFiltersReconciler())
+ .build();
+
+ @Test
+ void userFilterFullyControlsUpdateEvents() {
+ var res = operator.create(createResource());
+
+ await()
+ .pollDelay(Duration.ofMillis(POLL_DELAY))
+ .untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(1));
+
+ res = operator.get(FilterTestCustomResource.class, RESOURCE_NAME);
+ res.getSpec().setValue("updated");
+ operator.replace(res);
+
+ await()
+ .pollDelay(Duration.ofMillis(POLL_DELAY))
+ .untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(2));
+
+ res = operator.get(FilterTestCustomResource.class, RESOURCE_NAME);
+ res.getMetadata()
+ .setAnnotations(Map.of(WithoutDefaultFiltersReconciler.TRIGGER_ANNOTATION, "true"));
+ operator.replace(res);
+
+ await()
+ .pollDelay(Duration.ofMillis(POLL_DELAY))
+ .untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(3));
+
+ res = operator.get(FilterTestCustomResource.class, RESOURCE_NAME);
+ res.getMetadata().getAnnotations().remove(WithoutDefaultFiltersReconciler.TRIGGER_ANNOTATION);
+ operator.replace(res);
+
+ await()
+ .pollDelay(Duration.ofMillis(POLL_DELAY))
+ .untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(3));
+ }
+
+ private WithoutDefaultFiltersReconciler reconciler() {
+ return operator.getReconcilerOfType(WithoutDefaultFiltersReconciler.class);
+ }
+
+ FilterTestCustomResource createResource() {
+ var resource = new FilterTestCustomResource();
+ resource.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build());
+ resource.setSpec(new FilterTestResourceSpec());
+ resource.getSpec().setValue("initial");
+ return resource;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersReconciler.java
new file mode 100644
index 0000000000..a87e9feaa6
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersReconciler.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.filter;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.javaoperatorsdk.operator.api.config.informer.Informer;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+@ControllerConfiguration(
+ defaultFilters = false,
+ informer = @Informer(onUpdateFilter = WithoutDefaultFiltersUpdateFilter.class))
+public class WithoutDefaultFiltersReconciler implements Reconciler {
+
+ public static final String TRIGGER_ANNOTATION = "trigger-without-default-filters";
+
+ private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
+
+ @Override
+ public UpdateControl reconcile(
+ FilterTestCustomResource resource, Context context) {
+ numberOfExecutions.incrementAndGet();
+ return UpdateControl.noUpdate();
+ }
+
+ public int getNumberOfExecutions() {
+ return numberOfExecutions.get();
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersUpdateFilter.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersUpdateFilter.java
new file mode 100644
index 0000000000..8281689f5a
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filter/WithoutDefaultFiltersUpdateFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.filter;
+
+import io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters;
+import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
+
+public class WithoutDefaultFiltersUpdateFilter implements OnUpdateFilter {
+
+ private final OnUpdateFilter composed =
+ InternalEventFilters.onUpdateGenerationAware(true)
+ .or(
+ (newResource, oldResource) -> {
+ var annotations = newResource.getMetadata().getAnnotations();
+ return annotations != null
+ && "true"
+ .equals(
+ annotations.get(WithoutDefaultFiltersReconciler.TRIGGER_ANNOTATION));
+ });
+
+ @Override
+ public boolean accept(
+ FilterTestCustomResource newResource, FilterTestCustomResource oldResource) {
+ return composed.accept(newResource, oldResource);
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterIT.java
index df8d7c2591..8d2ff01cb7 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterIT.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterIT.java
@@ -17,7 +17,6 @@
import java.time.Duration;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -37,7 +36,6 @@
* re-list starts WHILE the update window is open — own write is propagated
*
*/
-@Disabled("enable when fabric8 supports relist")
class OnRelistFilterIT {
static final String RESOURCE_NAME = "test-resource";
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterReconciler.java
index 287141e4d1..13e8e72d74 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/onrelistfilter/OnRelistFilterReconciler.java
@@ -15,8 +15,11 @@
*/
package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.onrelistfilter;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -32,6 +35,7 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
@@ -78,7 +82,13 @@ public UpdateControl reconcile(
case NO_RELIST -> context.resourceOperations().serverSideApply(cm, configMapEventSource);
case RELIST_AROUND_UPDATE -> {
configMapEventSource.simulateOnBeforeList();
- context.resourceOperations().serverSideApply(cm, configMapEventSource);
+ var applied = context.resourceOperations().serverSideApply(cm, configMapEventSource);
+ // Make the simulation deterministic: the own-write watch event arrives asynchronously,
+ // so we must wait for it to be received (and buffered into the still-open re-list
+ // window, where it is tagged as part of the re-list) BEFORE the re-list finishes.
+ // Otherwise onList may clear the window's re-list flag before the event lands and the
+ // event would be filtered as an own write — the race this test originally flaked on.
+ configMapEventSource.awaitWatchEventReceived(applied);
configMapEventSource.simulateOnList();
}
case RELIST_COMPLETES_BEFORE_UPDATE -> {
@@ -90,20 +100,24 @@ public UpdateControl reconcile(
// Drive the event-filtering update path manually so we can fire onBeforeList AFTER the
// window has been opened by startEventFilteringModify but BEFORE the SSA hits the API.
var fieldManager = context.getControllerConfiguration().fieldManager();
- configMapEventSource.eventFilteringUpdateAndCacheResource(
- cm,
- r -> {
- configMapEventSource.simulateOnBeforeList();
- return context
- .getClient()
- .resource(r)
- .patch(
- new PatchContext.Builder()
- .withForce(true)
- .withFieldManager(fieldManager)
- .withPatchType(PatchType.SERVER_SIDE_APPLY)
- .build());
- });
+ var applied =
+ configMapEventSource.eventFilteringUpdateAndCacheResource(
+ cm,
+ r -> {
+ configMapEventSource.simulateOnBeforeList();
+ return context
+ .getClient()
+ .resource(r)
+ .patch(
+ new PatchContext.Builder()
+ .withForce(true)
+ .withFieldManager(fieldManager)
+ .withPatchType(PatchType.SERVER_SIDE_APPLY)
+ .build());
+ });
+ // See RELIST_AROUND_UPDATE: wait for the own-write event to be buffered while the
+ // re-list is still in progress, so it is tagged as part of the re-list and propagated.
+ configMapEventSource.awaitWatchEventReceived(applied);
configMapEventSource.simulateOnList();
}
}
@@ -154,14 +168,60 @@ private static ConfigMap prepareConfigMap(OnRelistFilterCustomResource p) {
static class RelistAwareInformerEventSource
extends InformerEventSource {
+ // Highest resourceVersion the informer has actually delivered (as a watch event) per resource.
+ // Lets a test block until the event for its own write has been received and processed.
+ private final ConcurrentMap latestReceivedVersion = new ConcurrentHashMap<>();
+
RelistAwareInformerEventSource(
InformerEventSourceConfiguration configuration, EventSourceContext context) {
super(configuration, context);
}
+ @Override
+ public void onAdd(R newResource) {
+ super.onAdd(newResource);
+ recordReceived(newResource);
+ }
+
+ @Override
+ public void onUpdate(R oldResource, R newResource) {
+ super.onUpdate(oldResource, newResource);
+ recordReceived(newResource);
+ }
+
+ private void recordReceived(R resource) {
+ latestReceivedVersion.merge(
+ ResourceID.fromResource(resource),
+ Long.parseLong(resource.getMetadata().getResourceVersion()),
+ Math::max);
+ }
+
+ /**
+ * Blocks until the informer has delivered a watch event for the given resource at a
+ * resourceVersion at least as recent as the one supplied (i.e. our own write has come back
+ * through the watch). Calling {@code super.onAdd/onUpdate} before recording guarantees the
+ * event is already buffered in the event-filter window by the time this returns.
+ */
+ void awaitWatchEventReceived(R resource) {
+ var id = ResourceID.fromResource(resource);
+ var target = Long.parseLong(resource.getMetadata().getResourceVersion());
+ var deadline = System.nanoTime() + Duration.ofSeconds(10).toNanos();
+ while (latestReceivedVersion.getOrDefault(id, -1L) < target) {
+ if (System.nanoTime() > deadline) {
+ throw new IllegalStateException(
+ "Timed out waiting for watch event with rv>=" + target + " for " + id);
+ }
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
void simulateOnBeforeList() {
- // uncomment when fabric8 supports re-list
- // onBeforeList(null);
+ onBeforeList(null);
}
void simulateOnList() {
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/ownsecondaryupdate/OwnSecondaryUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/ownsecondaryupdate/OwnSecondaryUpdateIT.java
index dfa5b899fe..eaa8f14c69 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/ownsecondaryupdate/OwnSecondaryUpdateIT.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/ownsecondaryupdate/OwnSecondaryUpdateIT.java
@@ -17,7 +17,6 @@
import java.time.Duration;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -33,7 +32,6 @@
* the secondary are filtered and do NOT trigger additional reconciliations. Counterpart to {@code
* ExternalSecondaryUpdateIT}, which asserts the opposite for third-party updates.
*/
-@Disabled("enable if re-list notification supported by fabric8 client")
class OwnSecondaryUpdateIT {
static final String RESOURCE_NAME = "test-resource";
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryIntervalHonoredOnFrequentEventsIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryIntervalHonoredOnFrequentEventsIT.java
new file mode 100644
index 0000000000..df525e8056
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryIntervalHonoredOnFrequentEventsIT.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.retry;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.javaoperatorsdk.annotation.Sample;
+import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
+
+import static io.javaoperatorsdk.operator.baseapi.retry.RetryIT.createTestCustomResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+@Sample(
+ tldr = "Retry Interval Honored Despite Frequent Reconciliation Triggers",
+ description =
+ """
+ Verifies that with a low max attempts (3) and a high retry interval (1 minute), \
+ reconciliations triggered by external events (e.g. resource updates) during the retry \
+ window do not consume retry attempts. The retry counter should only advance when the \
+ scheduled retry deadline is approached, so the configured interval is honored.
+ """)
+class RetryIntervalHonoredOnFrequentEventsIT {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(RetryIntervalHonoredOnFrequentEventsIT.class);
+
+ public static final int MAX_RETRY_ATTEMPTS = 3;
+ public static final int RETRY_INTERVAL_MILLIS = 60_000;
+ public static final int ALL_EXECUTIONS_TO_FAIL = 99;
+ public static final int NUMBER_OF_UPDATES = 5;
+
+ RetryTestCustomReconciler reconciler = new RetryTestCustomReconciler(ALL_EXECUTIONS_TO_FAIL);
+
+ @RegisterExtension
+ LocallyRunOperatorExtension operator =
+ LocallyRunOperatorExtension.builder()
+ .withReconciler(
+ reconciler,
+ new GenericRetry()
+ .setInitialInterval(RETRY_INTERVAL_MILLIS)
+ .withLinearRetry()
+ .setMaxAttempts(MAX_RETRY_ATTEMPTS))
+ .build();
+
+ @Test
+ void frequentEventsDuringRetryWindowDoNotExhaustRetryCounter() {
+ RetryTestCustomResource resource = createTestCustomResource("frequent-events");
+ var created = operator.create(resource);
+
+ // Wait until the initial reconciliation has been executed and failed; the retry timer is now
+ // armed for RETRY_INTERVAL_MILLIS in the future, retry counter is at 1.
+ await()
+ .pollInterval(Duration.ofMillis(50))
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThanOrEqualTo(1));
+
+ // Trigger several updates spaced apart so each results in its own reconciliation cycle. Each
+ // failed reconciliation lands well inside the 1 minute retry window, so the retry counter
+ // must NOT advance — only the original retry deadline matters.
+ IntStream.rangeClosed(1, NUMBER_OF_UPDATES)
+ .forEach(
+ i -> {
+ log.debug("replacing resource, iteration: {}", i);
+ var latest =
+ operator.get(RetryTestCustomResource.class, created.getMetadata().getName());
+ latest.getSpec().setValue("update-" + i);
+ operator.replace(latest);
+ int expectedExecutions = i + 1;
+ await()
+ .pollInterval(Duration.ofMillis(50))
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ assertThat(reconciler.getNumberOfExecutions())
+ .isGreaterThanOrEqualTo(expectedExecutions));
+ });
+
+ // Reconciliations did happen for every event (so events are not lost) but the retry counter
+ // observed inside the reconciler never went past 1: the configured 1 minute interval is
+ // honored even under a steady stream of external events.
+ assertThat(reconciler.getNumberOfExecutions()).isGreaterThanOrEqualTo(NUMBER_OF_UPDATES + 1);
+ assertThat(reconciler.getMaxObservedRetryAttempt()).isEqualTo(1);
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java
index 30a339fc4d..f981b9e1cb 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/retry/RetryTestCustomReconciler.java
@@ -32,6 +32,7 @@ public class RetryTestCustomReconciler
private static final Logger log = LoggerFactory.getLogger(RetryTestCustomReconciler.class);
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
+ private final AtomicInteger maxObservedRetryAttempt = new AtomicInteger(0);
private final AtomicInteger numberOfExecutionFails;
@@ -43,6 +44,12 @@ public RetryTestCustomReconciler(int numberOfExecutionFails) {
public UpdateControl reconcile(
RetryTestCustomResource resource, Context context) {
numberOfExecutions.addAndGet(1);
+ context
+ .getRetryInfo()
+ .ifPresent(
+ info ->
+ maxObservedRetryAttempt.updateAndGet(
+ prev -> Math.max(prev, info.getAttemptCount())));
log.info("Value: " + resource.getSpec().getValue());
@@ -70,4 +77,8 @@ private void ensureStatusExists(RetryTestCustomResource resource) {
public int getNumberOfExecutions() {
return numberOfExecutions.get();
}
+
+ public int getMaxObservedRetryAttempt() {
+ return maxObservedRetryAttempt.get();
+ }
}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/loader/ConfigLoaderTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/loader/ConfigLoaderTest.java
index 1144e1c4f3..fedaf81eb6 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/loader/ConfigLoaderTest.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/loader/ConfigLoaderTest.java
@@ -198,10 +198,12 @@ public Optional getValue(String key, Class type) {
"josdk.controller.ctrl.finalizer",
"josdk.controller.ctrl.generation-aware",
"josdk.controller.ctrl.label-selector",
+ "josdk.controller.ctrl.shard-selector",
"josdk.controller.ctrl.max-reconciliation-interval",
"josdk.controller.ctrl.field-manager",
"josdk.controller.ctrl.trigger-reconciler-on-all-events",
"josdk.controller.ctrl.informer.label-selector",
+ "josdk.controller.ctrl.informer.shard-selector",
"josdk.controller.ctrl.informer.list-limit",
"josdk.controller.ctrl.rate-limiter.refresh-period",
"josdk.controller.ctrl.rate-limiter.limit-for-period");
diff --git a/pom.xml b/pom.xml
index 7becc5e31b..12c8b95cce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
pom
Operator SDK for Java
Java SDK for implementing Kubernetes operators
@@ -85,7 +85,7 @@
3.2.4
0.9.14
2.22.0
- 4.17
+ 4.16
2.11
3.15.0
diff --git a/sample-operators/controller-namespace-deletion/pom.xml b/sample-operators/controller-namespace-deletion/pom.xml
index 430ebe1d46..af4be01972 100644
--- a/sample-operators/controller-namespace-deletion/pom.xml
+++ b/sample-operators/controller-namespace-deletion/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-controller-namespace-deletion
diff --git a/sample-operators/leader-election/pom.xml b/sample-operators/leader-election/pom.xml
index 4354bd3d09..4f896485d1 100644
--- a/sample-operators/leader-election/pom.xml
+++ b/sample-operators/leader-election/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-leader-election
diff --git a/sample-operators/mysql-schema/pom.xml b/sample-operators/mysql-schema/pom.xml
index 63d57a215b..d2872c921a 100644
--- a/sample-operators/mysql-schema/pom.xml
+++ b/sample-operators/mysql-schema/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-mysql-schema-operator
diff --git a/sample-operators/operations/pom.xml b/sample-operators/operations/pom.xml
index 4c78a9614b..1786cf39d0 100644
--- a/sample-operators/operations/pom.xml
+++ b/sample-operators/operations/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-operations
@@ -106,11 +106,6 @@
operations-operator
-
-
- -Dlog4j.configurationFile=/config/log4j2.xml
-
-
diff --git a/sample-operators/pom.xml b/sample-operators/pom.xml
index e7aca4b8db..9313095584 100644
--- a/sample-operators/pom.xml
+++ b/sample-operators/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-operators
diff --git a/sample-operators/tomcat-operator/pom.xml b/sample-operators/tomcat-operator/pom.xml
index 9aae55ef26..ea964a2b07 100644
--- a/sample-operators/tomcat-operator/pom.xml
+++ b/sample-operators/tomcat-operator/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-tomcat-operator
diff --git a/sample-operators/webpage/pom.xml b/sample-operators/webpage/pom.xml
index 3b8ce0ac49..d50e5ef03c 100644
--- a/sample-operators/webpage/pom.xml
+++ b/sample-operators/webpage/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
sample-webpage-operator
diff --git a/test-index-processor/pom.xml b/test-index-processor/pom.xml
index 11cd3b476b..2ae7c5f454 100644
--- a/test-index-processor/pom.xml
+++ b/test-index-processor/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.3.6-SNAPSHOT
+ 999-SNAPSHOT
test-index-processor