This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 73a0258276fdf6bc67571ec8d32eeb3e5845c313
Author: iantowey <[email protected]>
AuthorDate: Wed Jun 3 21:00:08 2026 +0100

    [improve][functions] Allow customizing Kubernetes service domain suffix in 
Function Worker (#25872)
    
    Co-authored-by: Ian <[email protected]>
    (cherry picked from commit cc9fddcd02939ba6c735d88a2bfe702a22fb2ff4)
---
 conf/functions_worker.yml                          |  3 +++
 .../runtime/kubernetes/KubernetesRuntime.java      | 12 ++++++-----
 .../kubernetes/KubernetesRuntimeFactory.java       |  4 +++-
 .../kubernetes/KubernetesRuntimeFactoryConfig.java |  7 ++++++-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 23 ++++++++++++++++++++++
 5 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 6f995576ebd..794cfba1d51 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -198,6 +198,9 @@ functionRuntimeFactoryConfigs:
 #    # The Kubernetes pod name to run the function instances. It is set to
 #    # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this 
setting is left to be empty
 #    jobName:
+#    # Optional domain suffix to use when the Function Worker constructs the 
gRPC address to connect to function instances.
+#    # If left blank, it defaults to `.svc.cluster.local`. Set this if your 
Function Worker is outside the cluster and connects via an external 
Gateway/Ingress.
+#    kubernetesServiceDomainSuffix:
 #    # the docker image to run function instance. by default it is 
`apachepulsar/pulsar`
 #    pulsarDockerImageName:
 #    # the docker image to run function instance according to different 
configurations provided by users.
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 7a69b822cbd..3e460f7b97a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -153,11 +153,13 @@ public class KubernetesRuntime implements Runtime {
     private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
     private String functionInstanceClassPath;
     private String downloadDirectory;
+    private final String kubernetesServiceDomainSuffix;
 
     KubernetesRuntime(AppsV1Api appsClient,
                       CoreV1Api coreClient,
                       String jobNamespace,
                       String jobName,
+                      String kubernetesServiceDomainSuffix,
                       Map<String, String> customLabels,
                       Boolean installUserCodeDependencies,
                       String pythonDependencyRepository,
@@ -196,6 +198,7 @@ public class KubernetesRuntime implements Runtime {
         this.instanceConfig = instanceConfig;
         this.jobNamespace = jobNamespace;
         this.jobName = jobName;
+        this.kubernetesServiceDomainSuffix = kubernetesServiceDomainSuffix;
         this.customLabels = customLabels;
         this.functionDockerImages = functionDockerImages;
         this.pulsarDockerImageName = pulsarDockerImageName;
@@ -320,7 +323,6 @@ public class KubernetesRuntime implements Runtime {
             channel = new 
ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
             stub = new 
InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails()
                     .getParallelism()];
-
             String jobName = 
createJobName(instanceConfig.getFunctionDetails(), this.jobName);
             for (int i = 0; i < 
instanceConfig.getFunctionDetails().getParallelism(); ++i) {
                 String address = getServiceUrl(jobName, jobNamespace, i);
@@ -1194,11 +1196,11 @@ public class KubernetesRuntime implements Runtime {
         final String shortHash = 
DigestUtils.sha1Hex(jobNameBase).toLowerCase().substring(0, 8);
         return convertedJobName + "-" + shortHash;
     }
-
-    private static String getServiceUrl(String jobName, String jobNamespace, 
int instanceId) {
-        return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, 
instanceId, jobName, jobNamespace);
+    @VisibleForTesting
+    String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
+        String suffix = isNotBlank(kubernetesServiceDomainSuffix) ? 
kubernetesServiceDomainSuffix : "svc.cluster.local";
+        return String.format("%s-%d.%s.%s.%s", jobName, instanceId, jobName, 
jobNamespace, suffix);
     }
-
     public static void doChecks(Function.FunctionDetails functionDetails, 
String overridenJobName) {
         final String jobName = createJobName(functionDetails, 
overridenJobName);
         if (!jobName.equals(jobName.toLowerCase())) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index bbb6e3992a0..cba13acd5b0 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -101,6 +101,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
     private String functionInstanceClassPath;
     private String downloadDirectory;
     private int gracePeriodSeconds;
+    private String kubernetesServiceDomainSuffix;
 
     @ToString.Exclude
     @EqualsAndHashCode.Exclude
@@ -178,7 +179,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
         if (!Paths.get(this.downloadDirectory).isAbsolute()) {
             this.downloadDirectory = this.pulsarRootDir + "/" + 
this.downloadDirectory;
         }
-
+        this.kubernetesServiceDomainSuffix = 
factoryConfig.getKubernetesServiceDomainSuffix();
         this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
         this.installUserCodeDependencies = 
factoryConfig.getInstallUserCodeDependencies();
         this.pythonDependencyRepository = 
factoryConfig.getPythonDependencyRepository();
@@ -318,6 +319,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             // get the namespace for this function
             overriddenNamespace,
             overriddenName,
+            kubernetesServiceDomainSuffix,
             customLabels,
             installUserCodeDependencies,
             pythonDependencyRepository,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index 43cdc035076..ea196923497 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -47,7 +47,12 @@ public class KubernetesRuntimeFactoryConfig {
         doc = "The docker image used to run function instance. By default it 
is `apachepulsar/pulsar`"
     )
     protected String pulsarDockerImageName;
-
+    @FieldContext(
+        doc = "Optional domain suffix to use when the Function Worker 
constructs the gRPC address "
+            + "to connect to function instances. If left blank, it defaults to 
`.svc.cluster.local`. "
+            + "Set this if your Function Worker is outside the cluster and 
connects via an external Gateway/Ingress."
+    )
+    protected String kubernetesServiceDomainSuffix;
     @FieldContext(
             doc = "The function docker images used to run function instance 
according to different "
                     + "configurations provided by users. By default it is 
`apachepulsar/pulsar`"
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index f8069efe299..585c7e92009 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -341,6 +341,29 @@ public class KubernetesRuntimeTest {
         return config;
     }
 
+    @Test
+    public void testGetServiceUrl() throws Exception {
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+        InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
+
+        KubernetesRuntime container1 = factory.createContainer(
+            config, userJarFile, userJarFile, null, null, 30L);
+        assertEquals(container1.getServiceUrl("my-job", "my-namespace", 0),
+            "my-job-0.my-job.my-namespace.svc.cluster.local");
+
+        KubernetesRuntimeFactory factory2 = 
createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+        java.lang.reflect.Field field = 
KubernetesRuntimeFactory.class.getDeclaredField(
+            "kubernetesServiceDomainSuffix");
+        field.setAccessible(true);
+        field.set(factory2, "custom.gateway.internal");
+
+        KubernetesRuntime container2 = factory2.createContainer(
+            config, userJarFile, userJarFile, null, null, 30L);
+        assertEquals(container2.getServiceUrl("my-job", "my-namespace", 0),
+            "my-job-0.my-job.my-namespace.custom.gateway.internal");
+    }
+
+
     @Test
     public void testRamPadding() throws Exception {
         verifyRamPadding(0, 1000, 1000);

Reply via email to