This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 186746a [SPARK-52086] Upgrade `operator-sdk` to 5.1.1
186746a is described below
commit 186746af908092e7ff293e6c00605e9ec1fb59eb
Author: Attila Mészáros <[email protected]>
AuthorDate: Sat Jun 28 11:07:15 2025 -0700
[SPARK-52086] Upgrade `operator-sdk` to 5.1.1
### What changes were proposed in this pull request?
Change Java Operator SDK version to v5.1
The intention is to keep the functionality the same as the previous version;
-
https://mvnrepository.com/artifact/io.javaoperatorsdk/java-operator-sdk/5.1.1
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Standard CI should test it.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #252 from csviri/josdk-5_1.
Authored-by: Attila Mészáros <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
gradle/libs.versions.toml | 2 +-
.../apache/spark/k8s/operator/SparkOperator.java | 7 +++++--
.../config/SparkOperatorConfigMapReconciler.java | 22 ++++++++------------
.../operator/reconciler/SparkAppReconciler.java | 22 ++++++++------------
.../reconciler/SparkClusterReconciler.java | 24 ++++++++--------------
.../org/apache/spark/k8s/operator/utils/Utils.java | 24 ++++++++++++++++++++++
.../spark/k8s/operator/probe/HealthProbeTest.java | 13 +-----------
7 files changed, 57 insertions(+), 57 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 507399d..4be9c35 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -17,7 +17,7 @@
[versions]
fabric8 = "7.3.1"
lombok = "1.18.38"
-operator-sdk = "4.9.0"
+operator-sdk = "5.1.1"
okhttp = "4.12.0"
dropwizard-metrics = "4.2.30"
spark = "4.0.0"
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
index 00619d7..d0f750a 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -23,6 +23,7 @@ import static
org.apache.spark.k8s.operator.utils.Utils.getAppStatusListener;
import static
org.apache.spark.k8s.operator.utils.Utils.getClusterStatusListener;
import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -168,8 +169,8 @@ public class SparkOperator {
overrider.withKubernetesClient(client);
overrider.withStopOnInformerErrorDuringStartup(
SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
- overrider.withTerminationTimeoutSeconds(
- SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
+ overrider.withReconciliationTerminationTimeout(
+
Duration.ofSeconds(SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue()));
int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
if (parallelism > 0) {
log.info("Configuring operator with {} reconciliation threads.",
parallelism);
@@ -187,6 +188,7 @@ public class SparkOperator {
overrider.withMetrics(operatorJosdkMetrics);
metricsSystem.registerSource(operatorJosdkMetrics);
}
+ overrider.withUseSSAToPatchPrimaryResource(false);
}
protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider
overrider) {
@@ -198,6 +200,7 @@ public class SparkOperator {
overrider.withInformerStoppedHandler(
(informer, ex) ->
log.error("Dynamic config informer stopped: operator will not
accept config updates."));
+ overrider.withUseSSAToPatchPrimaryResource(false);
}
protected void overrideControllerConfigs(ControllerConfigurationOverrider<?>
overrider) {
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
index 182f423..491057d 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
@@ -19,18 +19,16 @@
package org.apache.spark.k8s.operator.config;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
import java.util.function.Function;
import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
-import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
@@ -46,10 +44,7 @@ import lombok.extern.slf4j.Slf4j;
@ControllerConfiguration
@RequiredArgsConstructor
@Slf4j
-public class SparkOperatorConfigMapReconciler
- implements Reconciler<ConfigMap>,
- ErrorStatusHandler<ConfigMap>,
- EventSourceInitializer<ConfigMap> {
+public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap>
{
private final Function<Set<String>, Boolean> namespaceUpdater;
private final String operatorNamespace;
private final Function<Void, Set<String>> watchedNamespacesGetter;
@@ -62,14 +57,15 @@ public class SparkOperatorConfigMapReconciler
}
@Override
- public Map<String, EventSource>
prepareEventSources(EventSourceContext<ConfigMap> context) {
- EventSource configMapEventSource =
+ public List<EventSource<?, ConfigMap>> prepareEventSources(
+ EventSourceContext<ConfigMap> context) {
+ var configMapEventSource =
new InformerEventSource<>(
- InformerConfiguration.from(ConfigMap.class, context)
- .withNamespaces(operatorNamespace)
+ InformerEventSourceConfiguration.from(ConfigMap.class,
ConfigMap.class)
+ .withNamespaces(Set.of(operatorNamespace))
.build(),
context);
- return EventSourceInitializer.nameEventSources(configMapEventSource);
+ return List.of(configMapEventSource);
}
@Override
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
index 89c5213..9b1323b 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
@@ -21,28 +21,25 @@ package org.apache.spark.k8s.operator.reconciler;
import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
import static
org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
+import static
org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
import static
org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import io.fabric8.kubernetes.api.model.Pod;
-import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -74,11 +71,7 @@ import
org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;
@ControllerConfiguration
@Slf4j
@RequiredArgsConstructor
-public class SparkAppReconciler
- implements Reconciler<SparkApplication>,
- ErrorStatusHandler<SparkApplication>,
- EventSourceInitializer<SparkApplication>,
- Cleaner<SparkApplication> {
+public class SparkAppReconciler implements Reconciler<SparkApplication>,
Cleaner<SparkApplication> {
private final SparkAppSubmissionWorker submissionWorker;
private final SparkAppStatusRecorder sparkAppStatusRecorder;
private final SentinelManager<SparkApplication> sentinelManager;
@@ -135,16 +128,17 @@ public class SparkAppReconciler
}
@Override
- public Map<String, EventSource> prepareEventSources(
+ public List<EventSource<?, SparkApplication>> prepareEventSources(
EventSourceContext<SparkApplication> context) {
EventSource podEventSource =
new InformerEventSource<>(
- InformerConfiguration.from(Pod.class, context)
-
.withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME))
+ InformerEventSourceConfiguration.from(Pod.class,
SparkApplication.class)
+ .withSecondaryToPrimaryMapper(
+
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
.withLabelSelector(commonResourceLabelsStr())
.build(),
context);
- return EventSourceInitializer.nameEventSources(podEventSource);
+ return List.of(podEventSource);
}
protected List<AppReconcileStep> getReconcileSteps(final SparkApplication
app) {
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
index fde1a98..f99d558 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
@@ -19,32 +19,29 @@
package org.apache.spark.k8s.operator.reconciler;
+import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
import static
org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
+import static
org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
import static
org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import io.fabric8.kubernetes.api.model.Pod;
-import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkCluster;
import org.apache.spark.k8s.operator.SparkClusterSubmissionWorker;
import org.apache.spark.k8s.operator.context.SparkClusterContext;
@@ -61,11 +58,7 @@ import
org.apache.spark.k8s.operator.utils.SparkClusterStatusRecorder;
@ControllerConfiguration
@Slf4j
@RequiredArgsConstructor
-public class SparkClusterReconciler
- implements Reconciler<SparkCluster>,
- ErrorStatusHandler<SparkCluster>,
- EventSourceInitializer<SparkCluster>,
- Cleaner<SparkCluster> {
+public class SparkClusterReconciler implements Reconciler<SparkCluster>,
Cleaner<SparkCluster> {
private final SparkClusterSubmissionWorker submissionWorker;
private final SparkClusterStatusRecorder sparkClusterStatusRecorder;
private final SentinelManager<SparkCluster> sentinelManager;
@@ -118,16 +111,17 @@ public class SparkClusterReconciler
}
@Override
- public Map<String, EventSource>
prepareEventSources(EventSourceContext<SparkCluster> context) {
+ public List<EventSource<?, SparkCluster>> prepareEventSources(
+ EventSourceContext<SparkCluster> context) {
EventSource podEventSource =
new InformerEventSource<>(
- InformerConfiguration.from(Pod.class, context)
+ InformerEventSourceConfiguration.from(Pod.class,
SparkCluster.class)
.withSecondaryToPrimaryMapper(
- Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME))
+
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
.withLabelSelector(commonResourceLabelsStr())
.build(),
context);
- return EventSourceInitializer.nameEventSources(podEventSource);
+ return List.of(podEventSource);
}
protected List<ClusterReconcileStep> getReconcileSteps(final SparkCluster
cluster) {
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index 28c1cf6..7ab3574 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -38,6 +38,9 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import
io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.k8s.operator.Constants;
@@ -144,4 +147,25 @@ public final class Utils {
public static String commonResourceLabelsStr() {
return labelsAsStr(commonManagedResourceLabels());
}
+
+ public static <T extends HasMetadata>
+ SecondaryToPrimaryMapper<T> basicLabelSecondaryToPrimaryMapper(String
nameKey) {
+ return resource -> {
+ final var metadata = resource.getMetadata();
+ if (metadata == null) {
+ return Collections.emptySet();
+ } else {
+ final var map = metadata.getLabels();
+ if (map == null) {
+ return Collections.emptySet();
+ }
+ var name = map.get(nameKey);
+ if (name == null) {
+ return Collections.emptySet();
+ }
+ var namespace = resource.getMetadata().getNamespace();
+ return Set.of(new ResourceID(name, namespace));
+ }
+ };
+ }
}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
index a8b4587..958a8d2 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
@@ -36,7 +36,6 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RuntimeInfo;
-import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import
io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
@@ -190,16 +189,6 @@ class HealthProbeTest {
}
}));
- return new InformerWrappingEventSourceHealthIndicator() {
- @Override
- public Map<String, InformerHealthIndicator> informerHealthIndicators() {
- return informers;
- }
-
- @Override
- public ResourceConfiguration getInformerConfiguration() {
- return null;
- }
- };
+ return () -> informers;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]