This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c2b9a0a [SPARK-54789] Support label selector based filter on
resources to reconcile
c2b9a0a is described below
commit c2b9a0a3a41a8758611300d1b1daa5ebe8627a01
Author: Zhou JIANG <[email protected]>
AuthorDate: Sun Jan 4 10:01:57 2026 +0900
[SPARK-54789] Support label selector based filter on resources to reconcile
### What changes were proposed in this pull request?
This PR adds a new property
`spark.kubernetes.operator.reconciler.labelSelector`. If set to non empty
value, operator would only reconcile custom resources that match the given
selector filter. By default, operator would still reconcile all resources.
### Why are the changes needed?
Introducing optional label based selectors allows operator to reconcile
only a targeted subset of custom resources when configured explicitly.
It's a common case for user to leverage label for isolation on resources.
This can be a result of various reason, including setting-up multiple operator
per environment / group in shared clusters, or for progressive adoption upon
new versions. This feature would make operator fits better for prod level
multi-tenant environments.
### Does this PR introduce any user-facing change?
New property becomes available while behavior is the same as before by
default.
### How was this patch tested?
Existing e2e validates the default behavior. Added new scenario to test
selector functionality.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #439 from jiangzho/selector.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.github/workflows/build_and_test.yml | 31 +++++-
docs/config_properties.md | 1 +
.../apache/spark/k8s/operator/SparkOperator.java | 6 ++
.../k8s/operator/config/SparkOperatorConf.java | 17 +++
.../org/apache/spark/k8s/operator/utils/Utils.java | 8 +-
tests/e2e/helm/selector-config-values.yaml | 18 ++++
tests/e2e/resource-selector/chainsaw-test.yaml | 118 +++++++++++++++++++++
tests/e2e/resource-selector/spark-example.yaml | 31 ++++++
8 files changed, 228 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/build_and_test.yml
b/.github/workflows/build_and_test.yml
index 233b3f0..12ddecd 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -78,11 +78,13 @@ jobs:
mode:
- dynamic
- static
+ - selector
test-group:
- spark-versions
- python
- state-transition
- resource-retain-duration
+ - resource-selector
- watched-namespaces
exclude:
- mode: dynamic
@@ -93,8 +95,22 @@ jobs:
test-group: state-transition
- mode: dynamic
test-group: resource-retain-duration
+ - mode: dynamic
+ test-group: resource-selector
- mode: static
test-group: watched-namespaces
+ - mode: static
+ test-group: resource-selector
+ - mode: selector
+ test-group: spark-versions
+ - mode: selector
+ test-group: python
+ - mode: selector
+ test-group: state-transition
+ - mode: selector
+ test-group: resource-retain-duration
+ - mode: selector
+ test-group: watched-namespaces
steps:
- name: Checkout repository
uses: actions/checkout@v6
@@ -147,7 +163,20 @@ jobs:
if: matrix.mode == 'dynamic'
run: |
chainsaw test --test-dir ./tests/e2e/${{ matrix.test-group }}
--parallel 1
-
+ - name: Run Spark K8s Operator on K8S with Resource Selector Enabled
+ if: matrix.mode == 'selector'
+ run: |
+ eval $(minikube docker-env)
+ ./gradlew buildDockerImage
+ helm install spark --create-namespace -f \
+ build-tools/helm/spark-kubernetes-operator/values.yaml -f \
+ tests/e2e/helm/selector-config-values.yaml \
+ build-tools/helm/spark-kubernetes-operator/
+ minikube docker-env --unset
+ - name: Run E2E Test with Resource Selector Enabled
+ if: matrix.mode == 'selector'
+ run: |
+ chainsaw test --test-dir ./tests/e2e/${{ matrix.test-group }}
--parallel 1
lint:
name: "Linter and documentation"
runs-on: ubuntu-latest
diff --git a/docs/config_properties.md b/docs/config_properties.md
index 534d946..5462a08 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -29,6 +29,7 @@
| spark.kubernetes.operator.reconciler.clusterStatusListenerClassNames |
String | | false | Comma-separated names of SparkClusterStatusListener class
implementations |
| spark.kubernetes.operator.reconciler.foregroundRequestTimeoutSeconds | Long
| 60 | true | Timeout (in seconds) for requests made to API server. This
applies only to foreground requests. |
| spark.kubernetes.operator.reconciler.intervalSeconds | Long | 120 | true |
Interval (in seconds, non-negative) to reconcile Spark applications. Note that
reconciliation is always expected to be triggered when app spec / status is
updated. This interval controls the reconcile behavior of operator
reconciliation even when there's no update on SparkApplication, e.g. to
determine whether a hanging app needs to be proactively terminated. Thus this
is recommended to set to above 2 minutes t [...]
+ | spark.kubernetes.operator.reconciler.labelSelector | String | | false | A
label selector that identifies a set of Spark custom resources for the operator
to reconcile. If not set or set to empty string, operator would reconcile all
Spark Apps and Clusters. |
| spark.kubernetes.operator.reconciler.parallelism | Integer | 50 | false |
Thread pool size for Spark Operator reconcilers. Unbounded pool would be used
if set to non-positive number. |
| spark.kubernetes.operator.reconciler.rateLimiter.maxLoopForPeriod | Integer
| 5 | false | Max number of reconcile loops triggered within the rate limiter
refresh period for each resource. Setting the limit <= 0 disables the limiter.
|
| spark.kubernetes.operator.reconciler.rateLimiter.refreshPeriodSeconds |
Integer | 15 | false | Operator rate limiter refresh period(in seconds) for
each resource. |
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
index 8804f57..4893383 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -56,6 +56,7 @@ import
org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
import org.apache.spark.k8s.operator.reconciler.SparkClusterReconciler;
import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
import org.apache.spark.k8s.operator.utils.SparkClusterStatusRecorder;
+import org.apache.spark.k8s.operator.utils.StringUtils;
/**
* Entry point for Spark Operator. Bootstrap the operator app by starting
watch and reconciler for
@@ -252,6 +253,11 @@ public class SparkOperator {
overrider.settingNamespaces(watchedNamespaces);
overrider.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
overrider.withRetry(SparkOperatorConf.getOperatorRetry());
+ if
(StringUtils.isNotBlank(SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue()))
{
+ log.info("Configuring operator reconciliation selectors to {}.",
+ SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue());
+
overrider.withLabelSelector(SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue());
+ }
}
/**
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 6870918..5fdffb5 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -75,6 +75,23 @@ public final class SparkOperatorConf {
.defaultValue("default")
.build();
+ /**
+ * A label selector that identifies a set of Spark custom resources for the
operator to
+ * reconcile. If not set or set to empty string, operator would reconcile
all Spark Apps
+ * and Clusters.
+ */
+ public static final ConfigOption<String> OPERATOR_RECONCILER_LABEL_SELECTOR =
+ ConfigOption.<String>builder()
+ .key("spark.kubernetes.operator.reconciler.labelSelector")
+ .enableDynamicOverride(false)
+ .description(
+ "A label selector that identifies a set of Spark custom
resources for the " +
+ "operator to reconcile. If not set or set to empty string,
operator would " +
+ "reconcile all Spark Apps and Clusters.")
+ .typeParameterClass(String.class)
+ .defaultValue("")
+ .build();
+
/**
* Enable to indicate informer errors should stop operator startup. If
disabled, operator startup
* will ignore recoverable errors, caused for example by RBAC issues and
will retry periodically.
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index a6ca076..024a0e2 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -44,6 +44,7 @@ import
io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMap
import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkApplication;
import org.apache.spark.k8s.operator.SparkCluster;
+import org.apache.spark.k8s.operator.config.SparkOperatorConf;
import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
import org.apache.spark.k8s.operator.listeners.SparkClusterStatusListener;
@@ -223,7 +224,12 @@ public final class Utils {
* @return A string of common resource labels.
*/
public static String commonResourceLabelsStr() {
- return labelsAsStr(commonManagedResourceLabels());
+ String commonLabels = labelsAsStr(commonManagedResourceLabels());
+ if
(StringUtils.isNotBlank(SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue()))
{
+ return String.join(",", commonLabels,
+ SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue());
+ }
+ return commonLabels;
}
/**
diff --git a/tests/e2e/helm/selector-config-values.yaml
b/tests/e2e/helm/selector-config-values.yaml
new file mode 100644
index 0000000..ab9ec16
--- /dev/null
+++ b/tests/e2e/helm/selector-config-values.yaml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+operatorConfiguration:
+ spark-operator.properties: |+
+ spark.kubernetes.operator.reconciler.labelSelector=foo=bar,keyx=valuey
diff --git a/tests/e2e/resource-selector/chainsaw-test.yaml
b/tests/e2e/resource-selector/chainsaw-test.yaml
new file mode 100644
index 0000000..fed366f
--- /dev/null
+++ b/tests/e2e/resource-selector/chainsaw-test.yaml
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: chainsaw.kyverno.io/v1alpha1
+kind: Test
+metadata:
+ name: spark-operator-resource-selector-validation
+spec:
+ scenarios:
+ - bindings:
+ - name: APPLICATION_FILE_NAME
+ value: spark-example.yaml
+ - name: SPARK_APPLICATION_NAME
+ value: "spark-example-resource-selector"
+ - name: SPARK_VERSION
+ value: "4.1.0"
+ - name: IMAGE
+ value: "apache/spark:4.1.0-scala"
+ steps:
+ - name: install-spark-application-and-apply-label
+ try:
+ - apply:
+ bindings:
+ - name: V_SPARK_VERSION
+ value: (concat('v', replace_all(($SPARK_VERSION), '.', '_')))
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ - name: IMAGE
+ value: ($IMAGE)
+ file: spark-example.yaml
+ - sleep:
+ duration: 30s
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n
default | jq ".status"
+ check:
+ (contains($stdout, 'null')): true
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl label sparkapplication $SPARK_APPLICATION_NAME -n
default keyx=valuey
+ - sleep:
+ duration: 30s
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n
default | jq ".status"
+ check:
+ (contains($stdout, 'null')): true
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl label sparkapplication $SPARK_APPLICATION_NAME -n
default foo=placeholder
+ - sleep:
+ duration: 30s
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n
default | jq ".status"
+ check:
+ (contains($stdout, 'null')): true
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl label sparkapplication $SPARK_APPLICATION_NAME -n
default foo=bar --overwrite
+ - sleep:
+ duration: 30s
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ content:
+ kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n
default | jq ".status"
+ check:
+ (contains($stdout, 'null')): false
+ catch:
+ - podLogs:
+ namespace: default
+ selector:
app.kubernetes.io/component=operator-deployment,app.kubernetes.io/name=spark-kubernetes-operator
+ - describe:
+ apiVersion: spark.apache.org/v1
+ kind: SparkApplication
+ namespace: default
+ finally:
+ - script:
+ env:
+ - name: SPARK_APPLICATION_NAME
+ value: ($SPARK_APPLICATION_NAME)
+ timeout: 120s
+ content: |
+ kubectl delete sparkapplication $SPARK_APPLICATION_NAME
--ignore-not-found=true
diff --git a/tests/e2e/resource-selector/spark-example.yaml
b/tests/e2e/resource-selector/spark-example.yaml
new file mode 100644
index 0000000..384edf4
--- /dev/null
+++ b/tests/e2e/resource-selector/spark-example.yaml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: spark.apache.org/v1
+kind: SparkApplication
+metadata:
+ name: ($SPARK_APPLICATION_NAME)
+ namespace: default
+spec:
+ mainClass: "org.apache.spark.examples.SparkPi"
+ jars: "local:///opt/spark/examples/jars/spark-examples.jar"
+ sparkConf:
+ spark.executor.instances: "1"
+ spark.kubernetes.container.image: ($IMAGE)
+ spark.kubernetes.authenticate.driver.serviceAccountName: "spark"
+ runtimeVersions:
+ sparkVersion: ($V_SPARK_VERSION)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]