This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b0d7d95  [SPARK-53647][SPARK-53648] Use `VertxHttpClientFactory` and 
`io.fabric8.kubernetes.client.http.Interceptor`
b0d7d95 is described below

commit b0d7d9592ba15b37c067657c929d44f9c2ff5d88
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Sep 19 13:10:46 2025 -0700

    [SPARK-53647][SPARK-53648] Use `VertxHttpClientFactory` and 
`io.fabric8.kubernetes.client.http.Interceptor`
    
    ### What changes were proposed in this pull request?
    
    Like Apache Spark main repository, this PR aims to make `Apache Spark K8s 
Operator` be independent from `OkHttp3` library for long-term maintainability.
    - https://github.com/apache/spark/pull/49159
      - [SPARK-50493 Migrate kubernetes-client from `6.x` to 
`7.x`](https://issues.apache.org/jira/browse/SPARK-50493)
      - [SPARK-37687 Cleanup direct usage of 
OkHttpClient](https://issues.apache.org/jira/browse/SPARK-37687)
    - https://github.com/apache/spark/pull/52346
    
    Technically, this goal is achieved by the following in this PR.
    - SPARK-53647 Use `io.fabric8.kubernetes.client.http.Interceptor` instead 
of `okhttp3.Interceptor`
    - SPARK-53648 Use `VertxHttpClientFactory` instead of `OkHttpClientFactory`
    
    ### Why are the changes needed?
    
    Currently, `Apache Spark K8s Operator` has a hard compilation dependency on 
`OkHttp3` library like the following.
    
    
https://github.com/apache/spark-kubernetes-operator/blob/a04c2bb9aeee5856681f796129e2f698a38e6ac1/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java#L38-L41
    
    From `Fabric8` v7.0.0, we should avoid `OkHttp3` because `Fabric8` 
community moved away from it like the following.
    
    - `VertxHttpClientFactory` is the default HTTP client factory now.
        - https://github.com/fabric8io/kubernetes-client/issues/6470
    
    - `io.fabric8.kubernetes.client.http.Interceptor` is the `fabric8`'s 
interceptor layer which we should use to be independent from the underlying 
HTTP factories. We should depend on this instead of exposing 
`okhttp3.Interceptor`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but Apache Spark K8s versions are still 0.x releases.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #327 from dongjoon-hyun/TODO_METRICS.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/config_properties.md                          |  2 +-
 gradle/libs.versions.toml                          |  2 +-
 spark-operator/build.gradle                        |  5 +-
 .../apache/spark/k8s/operator/SparkOperator.java   |  6 +-
 .../operator/client/KubernetesClientFactory.java   | 21 +++---
 .../k8s/operator/config/SparkOperatorConf.java     |  4 +-
 .../source/KubernetesMetricsInterceptor.java       | 81 ++++++++++++++++------
 .../source/KubernetesMetricsInterceptorTest.java   |  4 +-
 8 files changed, 81 insertions(+), 44 deletions(-)

diff --git a/docs/config_properties.md b/docs/config_properties.md
index f9e2dc0..55369e7 100644
--- a/docs/config_properties.md
+++ b/docs/config_properties.md
@@ -26,7 +26,7 @@
  | spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | 
Maximal number of retry attempts of requests to k8s server for resource status 
update. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
update on the same SparkApplication. This should be positive number. | 
  | spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3 
| false | Maximal number of retry attempts of requesting secondary resource for 
Spark application. This would be performed on top of k8s client 
spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting 
reconcile on the same SparkApplication. This should be positive number | 
  | spark.kubernetes.operator.metrics.josdkMetricsEnabled | Boolean | true | 
false | When enabled, the josdk metrics will be added in metrics source and 
configured for operator. | 
- | spark.kubernetes.operator.metrics.clientMetricsEnabled | Boolean | true | 
false | Enable KubernetesClient metrics for measuring the HTTP traffic to the 
Kubernetes API Server. Since the metrics is collected via Okhttp interceptors, 
can be disabled when opt in customized interceptors. | 
+ | spark.kubernetes.operator.metrics.clientMetricsEnabled | Boolean | true | 
false | Enable KubernetesClient metrics for measuring the HTTP traffic to the 
Kubernetes API Server. Since the metrics is collected via interceptors, can be 
disabled when opt in customized interceptors. | 
  | spark.kubernetes.operator.metrics.clientMetricsGroupByResponseCodeEnabled | 
Boolean | true | false | When enabled, additional metrics group by http 
response code group(1xx, 2xx, 3xx, 4xx, 5xx) received from API server will be 
added. Users can disable it when their monitoring system can combine lower 
level kubernetes.client.http.response.<3-digit-response-code> metrics. | 
  | spark.kubernetes.operator.metrics.port | Integer | 19090 | false | The port 
used for checking metrics | 
  | spark.kubernetes.operator.metrics.prometheusTextBasedFormatEnabled | 
Boolean | true | false | Whether or not to enable text-based format for 
Prometheus 2.0, as recommended by 
https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format 
| 
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 0a3827f..9d7a3a6 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -40,7 +40,7 @@ shadow-jar-plugin = "8.3.6"
 
 [libraries]
 kubernetes-client = { group = "io.fabric8", name = "kubernetes-client", 
version.ref = "fabric8" }
-kubernetes-httpclient-okhttp = { group = "io.fabric8", name = 
"kubernetes-httpclient-okhttp", version.ref = "fabric8" }
+kubernetes-httpclient-vertx = { group = "io.fabric8", name = 
"kubernetes-httpclient-vertx", version.ref = "fabric8" }
 kubernetes-server-mock = { group = "io.fabric8", name = 
"kubernetes-server-mock", version.ref = "fabric8" }
 kube-api-test-client-inject = {group = "io.fabric8", name = 
"kube-api-test-client-inject", version.ref = "fabric8"}
 crd-generator-apt = { group = "io.fabric8", name = "crd-generator-apt", 
version.ref = "fabric8" }
diff --git a/spark-operator/build.gradle b/spark-operator/build.gradle
index 7078905..466f96f 100644
--- a/spark-operator/build.gradle
+++ b/spark-operator/build.gradle
@@ -26,10 +26,7 @@ dependencies {
     exclude group: 'com.squareup.okio'
     exclude group: 'io.fabric8'
   }
-  implementation(libs.kubernetes.httpclient.okhttp) {
-    exclude group: 'com.squareup.okhttp3'
-  }
-  implementation(libs.okhttp)
+  implementation(libs.kubernetes.httpclient.vertx)
   implementation(libs.spotbugs.annotations)
   // logging
   implementation(libs.log4j.api)
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
index 4334567..e4b6a5b 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -33,12 +33,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.http.Interceptor;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.RegisteredController;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
 import lombok.extern.slf4j.Slf4j;
-import okhttp3.Interceptor;
 
 import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
 import org.apache.spark.k8s.operator.config.SparkOperatorConf;
@@ -249,8 +249,8 @@ public class SparkOperator {
   }
 
   /**
-   * Returns a list of OkHttp interceptors for the Kubernetes client, 
including metrics interceptors
-   * if enabled.
+   * Returns a list of interceptors for the Kubernetes client, including 
metrics interceptors if
+   * enabled.
    *
    * @param metricsSystem The MetricsSystem to register interceptors with.
    * @return A List of Interceptor objects.
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
index d725b46..9b975df 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java
@@ -23,9 +23,11 @@ import java.util.List;
 import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientBuilder;
-import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
-import okhttp3.Interceptor;
-import okhttp3.OkHttpClient;
+import io.fabric8.kubernetes.client.http.HttpClient;
+import io.fabric8.kubernetes.client.http.Interceptor;
+import io.fabric8.kubernetes.client.utils.HttpClientUtils;
+import io.fabric8.kubernetes.client.vertx.VertxHttpClientBuilder;
+import io.fabric8.kubernetes.client.vertx.VertxHttpClientFactory;
 
 /** Factory for building Kubernetes clients with metrics configured. */
 public final class KubernetesClientFactory {
@@ -35,7 +37,7 @@ public final class KubernetesClientFactory {
   /**
    * Builds a KubernetesClient with the given interceptors.
    *
-   * @param interceptors A list of OkHttp interceptors to add to the client.
+   * @param interceptors A list of interceptors to add to the client.
    * @return A new KubernetesClient instance.
    */
   public static KubernetesClient buildKubernetesClient(final List<Interceptor> 
interceptors) {
@@ -45,7 +47,7 @@ public final class KubernetesClientFactory {
   /**
    * Builds a KubernetesClient with the given interceptors and configuration.
    *
-   * @param interceptors A list of OkHttp interceptors to add to the client.
+   * @param interceptors A list of interceptors to add to the client.
    * @param kubernetesClientConfig The Kubernetes client configuration.
    * @return A new KubernetesClient instance.
    */
@@ -54,12 +56,15 @@ public final class KubernetesClientFactory {
     return new KubernetesClientBuilder()
         .withConfig(kubernetesClientConfig)
         .withHttpClientFactory(
-            new OkHttpClientFactory() {
+            new VertxHttpClientFactory() {
               @Override
-              protected void additionalConfig(OkHttpClient.Builder builder) {
+              public HttpClient.Builder newBuilder(Config config) {
+                VertxHttpClientBuilder builder = super.newBuilder();
+                HttpClientUtils.applyCommonConfiguration(config, builder, 
this);
                 for (Interceptor interceptor : interceptors) {
-                  builder.addInterceptor(interceptor);
+                  
builder.addOrReplaceInterceptor(interceptor.getClass().getName(), interceptor);
                 }
+                return builder;
               }
             })
         .build();
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
index 1e3e949..0ce8fa8 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java
@@ -377,7 +377,7 @@ public final class SparkOperatorConf {
 
   /**
    * Enable KubernetesClient metrics for measuring the HTTP traffic to the 
Kubernetes API Server.
-   * Since the metrics is collected via Okhttp interceptors, can be disabled 
when opt in customized
+   * Since the metrics is collected via interceptors, can be disabled when opt 
in customized
    * interceptors.
    */
   public static final ConfigOption<Boolean> KUBERNETES_CLIENT_METRICS_ENABLED =
@@ -387,7 +387,7 @@ public final class SparkOperatorConf {
           .description(
               "Enable KubernetesClient metrics for measuring the HTTP traffic 
to "
                   + "the Kubernetes API Server. Since the metrics is collected 
"
-                  + "via Okhttp interceptors, can be disabled when opt in "
+                  + "via interceptors, can be disabled when opt in "
                   + "customized interceptors.")
           .typeParameterClass(Boolean.class)
           .defaultValue(true)
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
index 685a6d9..83b01ed 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java
@@ -21,22 +21,20 @@ package org.apache.spark.k8s.operator.metrics.source;
 
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED;
 
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import io.fabric8.kubernetes.client.http.*;
 import lombok.extern.slf4j.Slf4j;
-import okhttp3.Interceptor;
-import okhttp3.Request;
-import okhttp3.Response;
 import org.apache.commons.lang3.tuple.Pair;
-import org.jetbrains.annotations.NotNull;
 
 import org.apache.spark.metrics.source.Source;
 
@@ -90,25 +88,62 @@ public class KubernetesMetricsInterceptor implements 
Interceptor, Source {
   }
 
   /**
-   * Intercepts an HTTP request and updates Kubernetes client metrics.
+   * Called before a request to allow for the manipulation of the request
    *
-   * @param chain The Interceptor.Chain for the current request.
-   * @return The Response from the intercepted request.
-   * @throws IOException if an I/O error occurs during the request.
+   * @param builder used to modify the request
+   * @param request the current request
    */
-  @NotNull
   @Override
-  public Response intercept(@NotNull Chain chain) throws IOException {
-    Request request = chain.request();
+  public void before(BasicBuilder builder, HttpRequest request, RequestTags 
tags) {
     updateRequestMetrics(request);
-    Response response = null;
-    final long startTime = System.nanoTime();
-    try {
-      response = chain.proceed(request);
-      return response;
-    } finally {
-      updateResponseMetrics(response, startTime);
-    }
+  }
+
+  /**
+   * Called after a non-WebSocket HTTP response is received. The body might or 
might not be already
+   * consumed.
+   *
+   * <p>Should be used to analyze response codes and headers, original 
response shouldn't be
+   * altered.
+   *
+   * @param request the original request sent to the server.
+   * @param response the response received from the server.
+   */
+  @Override
+  public void after(
+      HttpRequest request,
+      HttpResponse<?> response,
+      AsyncBody.Consumer<List<ByteBuffer>> consumer) {
+    updateResponseMetrics(response, System.nanoTime());
+  }
+
+  /**
+   * Called after a websocket failure or by default from a normal request.
+   *
+   * <p>Failure is determined by HTTP status code and will be invoked in 
addition to {@link
+   * Interceptor#after(HttpRequest, HttpResponse, AsyncBody.Consumer)}
+   *
+   * @param builder used to modify the request
+   * @param response the failed response
+   * @return true if the builder should be used to execute a new request
+   */
+  @Override
+  public CompletableFuture<Boolean> afterFailure(
+      BasicBuilder builder, HttpResponse<?> response, RequestTags tags) {
+    updateResponseMetrics(null, System.nanoTime());
+    return CompletableFuture.completedFuture(false);
+  }
+
+  /**
+   * Called after a connection attempt fails.
+   *
+   * <p>This method will be invoked on each failed connection attempt.
+   *
+   * @param request the HTTP request.
+   * @param failure the Java exception that caused the failure.
+   */
+  @Override
+  public void afterConnectionFailure(HttpRequest request, Throwable failure) {
+    updateResponseMetrics(null, System.nanoTime());
   }
 
   /**
@@ -131,11 +166,11 @@ public class KubernetesMetricsInterceptor implements 
Interceptor, Source {
     return this.metricRegistry;
   }
 
-  private void updateRequestMetrics(Request request) {
+  private void updateRequestMetrics(HttpRequest request) {
     this.requestRateMeter.mark();
     getMeterByRequestMethod(request.method()).mark();
     Optional<Pair<String, String>> resourceNamePairOptional =
-        parseNamespaceScopedResource(request.url().uri().getPath());
+        parseNamespaceScopedResource(request.uri().getPath());
     resourceNamePairOptional.ifPresent(
         pair -> {
           getMeterByRequestMethodAndResourceName(pair.getValue(), 
request.method()).mark();
@@ -145,7 +180,7 @@ public class KubernetesMetricsInterceptor implements 
Interceptor, Source {
         });
   }
 
-  private void updateResponseMetrics(Response response, long startTimeNanos) {
+  private void updateResponseMetrics(HttpResponse response, long 
startTimeNanos) {
     final long latency = System.nanoTime() - startTimeNanos;
     if (response != null) {
       this.responseRateMeter.mark();
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
index 3ad89fc..54d5214 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java
@@ -33,9 +33,9 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.http.Interceptor;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
-import okhttp3.Interceptor;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.MethodOrderer;
@@ -122,7 +122,7 @@ class KubernetesMetricsInterceptorTest {
       Map<String, Metric> map = 
metricsInterceptor.metricRegistry().getMetrics();
       Assertions.assertEquals(12, map.size());
       Meter metric = (Meter) map.get("failed");
-      Assertions.assertEquals(metric.getCount(), retry + 1);
+      Assertions.assertEquals(metric.getCount(), retry);
       Assertions.assertEquals(((Meter) map.get("http.request")).getCount(), 
retry + 1);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to