This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c2b9a0a  [SPARK-54789] Support label selector based filter on 
resources to reconcile
c2b9a0a is described below

commit c2b9a0a3a41a8758611300d1b1daa5ebe8627a01
Author: Zhou JIANG <[email protected]>
AuthorDate: Sun Jan 4 10:01:57 2026 +0900

    [SPARK-54789] Support label selector based filter on resources to reconcile
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new property 
`spark.kubernetes.operator.reconciler.labelSelector`. If set to non empty 
value, operator would only reconcile custom resources that match the given 
selector filter. By default, operator would still reconcile all resources.
    
    ### Why are the changes needed?
    
    Introducing optional label based selectors allows operator to reconcile 
only a targeted subset of custom resources when configured explicitly.
    
    It's a common case for user to leverage label for isolation on resources. 
This can be a result of various reason, including setting-up multiple operator 
per environment / group in shared clusters, or for progressive adoption upon 
new versions. This feature would make operator fits better for prod level 
multi-tenant environments.
    
    ### Does this PR introduce any user-facing change?
    
    New property becomes available while behavior is the same as before by 
default.
    
    ### How was this patch tested?
    
    Existing e2e validates the default behavior. Added new scenario to test 
selector functionality.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #439 from jiangzho/selector.
    
    Authored-by: Zhou JIANG <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .github/workflows/build_and_test.yml               |  31 +++++-
 docs/config_properties.md                          |   1 +
 .../apache/spark/k8s/operator/SparkOperator.java   |   6 ++
 .../k8s/operator/config/SparkOperatorConf.java     |  17 +++
 .../org/apache/spark/k8s/operator/utils/Utils.java |   8 +-
 tests/e2e/helm/selector-config-values.yaml         |  18 ++++
 tests/e2e/resource-selector/chainsaw-test.yaml     | 118 +++++++++++++++++++++
 tests/e2e/resource-selector/spark-example.yaml     |  31 ++++++
 8 files changed, 228 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 233b3f0..12ddecd 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -78,11 +78,13 @@ jobs:
         mode:
           - dynamic
           - static
+          - selector
         test-group:
           - spark-versions
           - python
           - state-transition
           - resource-retain-duration
+          - resource-selector
           - watched-namespaces
         exclude:
           - mode: dynamic
@@ -93,8 +95,22 @@ jobs:
             test-group: state-transition
           - mode: dynamic
             test-group: resource-retain-duration
+          - mode: dynamic
+            test-group: resource-selector
           - mode: static
             test-group: watched-namespaces
+          - mode: static
+            test-group: resource-selector
+          - mode: selector
+            test-group: spark-versions
+          - mode: selector
+            test-group: python
+          - mode: selector
+            test-group: state-transition
+          - mode: selector
+            test-group: resource-retain-duration
+          - mode: selector
+            test-group: watched-namespaces
     steps:
       - name: Checkout repository
         uses: actions/checkout@v6
@@ -147,7 +163,20 @@ jobs:
         if: matrix.mode == 'dynamic'
         run: |
           chainsaw test --test-dir ./tests/e2e/${{ matrix.test-group }} 
--parallel 1
-
+      - name: Run Spark K8s Operator on K8S with Resource Selector Enabled
+        if: matrix.mode == 'selector'
+        run: |
+          eval $(minikube docker-env)
+          ./gradlew buildDockerImage
+          helm install spark --create-namespace -f \
+          build-tools/helm/spark-kubernetes-operator/values.yaml -f \
+          tests/e2e/helm/selector-config-values.yaml \
+          build-tools/helm/spark-kubernetes-operator/
+          minikube docker-env --unset
+      - name: Run E2E Test with Resource Selector Enabled
+        if: matrix.mode == 'selector'
+        run: |
+          chainsaw test --test-dir ./tests/e2e/${{ matrix.test-group }} 
--parallel 1
   lint:
     name: "Linter and documentation"
     runs-on: ubuntu-latest
diff --git a/docs/config_properties.md b/docs/config_properties.md
index 534d946..5462a08 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -29,6 +29,7 @@
  | spark.kubernetes.operator.reconciler.clusterStatusListenerClassNames | 
String |  | false | Comma-separated names of SparkClusterStatusListener class 
implementations | 
  | spark.kubernetes.operator.reconciler.foregroundRequestTimeoutSeconds | Long 
| 60 | true | Timeout (in seconds) for requests made to API server. This 
applies only to foreground requests. | 
  | spark.kubernetes.operator.reconciler.intervalSeconds | Long | 120 | true | 
Interval (in seconds, non-negative) to reconcile Spark applications. Note that 
reconciliation is always expected to be triggered when app spec / status is 
updated. This interval controls the reconcile behavior of operator 
reconciliation even when there's no update on SparkApplication, e.g. to 
determine whether a hanging app needs to be proactively terminated. Thus this 
is recommended to set to above 2 minutes t [...]
+ | spark.kubernetes.operator.reconciler.labelSelector | String |  | false | A 
label selector that identifies a set of Spark custom resources for the operator 
to reconcile. If not set or set to empty string, operator would reconcile all 
Spark Apps and Clusters. | 
  | spark.kubernetes.operator.reconciler.parallelism | Integer | 50 | false | 
Thread pool size for Spark Operator reconcilers. Unbounded pool would be used 
if set to non-positive number. | 
  | spark.kubernetes.operator.reconciler.rateLimiter.maxLoopForPeriod | Integer 
| 5 | false | Max number of reconcile loops triggered within the rate limiter 
refresh period for each resource. Setting the limit <= 0 disables the limiter. 
| 
  | spark.kubernetes.operator.reconciler.rateLimiter.refreshPeriodSeconds | 
Integer | 15 | false | Operator rate limiter refresh period(in seconds) for 
each resource. | 
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
index 8804f57..4893383 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -56,6 +56,7 @@ import 
org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
 import org.apache.spark.k8s.operator.reconciler.SparkClusterReconciler;
 import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
 import org.apache.spark.k8s.operator.utils.SparkClusterStatusRecorder;
+import org.apache.spark.k8s.operator.utils.StringUtils;
 
 /**
  * Entry point for Spark Operator. Bootstrap the operator app by starting 
watch and reconciler for
@@ -252,6 +253,11 @@ public class SparkOperator {
     overrider.settingNamespaces(watchedNamespaces);
     overrider.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
     overrider.withRetry(SparkOperatorConf.getOperatorRetry());
+    if 
(StringUtils.isNotBlank(SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue()))
 {
+      log.info("Configuring operator reconciliation selectors to {}.",
+          SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue());
+      
overrider.withLabelSelector(SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue());
+    }
   }
 
   /**
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 6870918..5fdffb5 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -75,6 +75,23 @@ public final class SparkOperatorConf {
           .defaultValue("default")
           .build();
 
+  /**
+   * A label selector that identifies a set of Spark custom resources for the 
operator to
+   * reconcile. If not set or set to empty string, operator would reconcile 
all Spark Apps
+   * and Clusters.
+   */
+  public static final ConfigOption<String> OPERATOR_RECONCILER_LABEL_SELECTOR =
+      ConfigOption.<String>builder()
+          .key("spark.kubernetes.operator.reconciler.labelSelector")
+          .enableDynamicOverride(false)
+          .description(
+              "A label selector that identifies a set of Spark custom 
resources for the " +
+                  "operator to reconcile. If not set or set to empty string, 
operator would " +
+                  "reconcile all Spark Apps and Clusters.")
+          .typeParameterClass(String.class)
+          .defaultValue("")
+          .build();
+
   /**
    * Enable to indicate informer errors should stop operator startup. If 
disabled, operator startup
    * will ignore recoverable errors, caused for example by RBAC issues and 
will retry periodically.
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index a6ca076..024a0e2 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -44,6 +44,7 @@ import 
io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMap
 import org.apache.spark.k8s.operator.Constants;
 import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.SparkCluster;
+import org.apache.spark.k8s.operator.config.SparkOperatorConf;
 import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
 import org.apache.spark.k8s.operator.listeners.SparkClusterStatusListener;
 
@@ -223,7 +224,12 @@ public final class Utils {
    * @return A string of common resource labels.
    */
   public static String commonResourceLabelsStr() {
-    return labelsAsStr(commonManagedResourceLabels());
+    String commonLabels = labelsAsStr(commonManagedResourceLabels());
+    if 
(StringUtils.isNotBlank(SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue()))
 {
+      return String.join(",", commonLabels,
+          SparkOperatorConf.OPERATOR_RECONCILER_LABEL_SELECTOR.getValue());
+    }
+    return commonLabels;
   }
 
   /**
diff --git a/tests/e2e/helm/selector-config-values.yaml 
b/tests/e2e/helm/selector-config-values.yaml
new file mode 100644
index 0000000..ab9ec16
--- /dev/null
+++ b/tests/e2e/helm/selector-config-values.yaml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+operatorConfiguration:
+  spark-operator.properties: |+
+    spark.kubernetes.operator.reconciler.labelSelector=foo=bar,keyx=valuey
diff --git a/tests/e2e/resource-selector/chainsaw-test.yaml 
b/tests/e2e/resource-selector/chainsaw-test.yaml
new file mode 100644
index 0000000..fed366f
--- /dev/null
+++ b/tests/e2e/resource-selector/chainsaw-test.yaml
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: chainsaw.kyverno.io/v1alpha1
+kind: Test
+metadata:
+  name: spark-operator-resource-selector-validation
+spec:
+  scenarios:
+    - bindings:
+        - name: APPLICATION_FILE_NAME
+          value: spark-example.yaml
+        - name: SPARK_APPLICATION_NAME
+          value: "spark-example-resource-selector"
+        - name: SPARK_VERSION
+          value: "4.1.0"
+        - name: IMAGE
+          value: "apache/spark:4.1.0-scala"
+  steps:
+    - name: install-spark-application-and-apply-label
+      try:
+        - apply:
+            bindings:
+              - name: V_SPARK_VERSION
+                value: (concat('v', replace_all(($SPARK_VERSION), '.', '_')))
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+              - name: IMAGE
+                value: ($IMAGE)
+            file: spark-example.yaml
+        - sleep:
+            duration: 30s
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n 
default | jq ".status"
+            check:
+              (contains($stdout, 'null')): true
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl label sparkapplication $SPARK_APPLICATION_NAME -n 
default keyx=valuey
+        - sleep:
+            duration: 30s
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n 
default | jq ".status"
+            check:
+              (contains($stdout, 'null')): true
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl label sparkapplication $SPARK_APPLICATION_NAME -n 
default foo=placeholder
+        - sleep:
+            duration: 30s
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n 
default | jq ".status"
+            check:
+              (contains($stdout, 'null')): true
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl label sparkapplication $SPARK_APPLICATION_NAME -n 
default foo=bar --overwrite
+        - sleep:
+            duration: 30s
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            content:
+              kubectl get sparkapplication $SPARK_APPLICATION_NAME -o json -n 
default | jq ".status"
+            check:
+              (contains($stdout, 'null')): false
+      catch:
+        - podLogs:
+            namespace: default
+            selector: 
app.kubernetes.io/component=operator-deployment,app.kubernetes.io/name=spark-kubernetes-operator
+        - describe:
+            apiVersion: spark.apache.org/v1
+            kind: SparkApplication
+            namespace: default
+      finally:
+        - script:
+            env:
+              - name: SPARK_APPLICATION_NAME
+                value: ($SPARK_APPLICATION_NAME)
+            timeout: 120s
+            content: |
+              kubectl delete sparkapplication $SPARK_APPLICATION_NAME 
--ignore-not-found=true
diff --git a/tests/e2e/resource-selector/spark-example.yaml 
b/tests/e2e/resource-selector/spark-example.yaml
new file mode 100644
index 0000000..384edf4
--- /dev/null
+++ b/tests/e2e/resource-selector/spark-example.yaml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: spark.apache.org/v1
+kind: SparkApplication
+metadata:
+  name: ($SPARK_APPLICATION_NAME)
+  namespace: default
+spec:
+  mainClass: "org.apache.spark.examples.SparkPi"
+  jars: "local:///opt/spark/examples/jars/spark-examples.jar"
+  sparkConf:
+    spark.executor.instances: "1"
+    spark.kubernetes.container.image: ($IMAGE)
+    spark.kubernetes.authenticate.driver.serviceAccountName: "spark"
+  runtimeVersions:
+    sparkVersion: ($V_SPARK_VERSION)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to