This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc9fddcd029 [improve][functions] Allow customizing Kubernetes service
domain suffix in Function Worker (#25872)
cc9fddcd029 is described below
commit cc9fddcd02939ba6c735d88a2bfe702a22fb2ff4
Author: iantowey <[email protected]>
AuthorDate: Wed Jun 3 21:00:08 2026 +0100
[improve][functions] Allow customizing Kubernetes service domain suffix in
Function Worker (#25872)
Co-authored-by: Ian <[email protected]>
---
conf/functions_worker.yml | 3 +++
.../runtime/kubernetes/KubernetesRuntime.java | 15 +++++++-------
.../kubernetes/KubernetesRuntimeFactory.java | 4 +++-
.../kubernetes/KubernetesRuntimeFactoryConfig.java | 7 ++++++-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 23 ++++++++++++++++++++++
5 files changed, 43 insertions(+), 9 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 6f995576ebd..794cfba1d51 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -198,6 +198,9 @@ functionRuntimeFactoryConfigs:
# # The Kubernetes pod name to run the function instances. It is set to
# # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this
setting is left to be empty
# jobName:
+# # Optional domain suffix to use when the Function Worker constructs the
gRPC address to connect to function instances.
+# # If left blank, it defaults to `.svc.cluster.local`. Set this if your
Function Worker is outside the cluster and connects via an external
Gateway/Ingress.
+# kubernetesServiceDomainSuffix:
# # the docker image to run function instance. by default it is
`apachepulsar/pulsar`
# pulsarDockerImageName:
# # the docker image to run function instance according to different
configurations provided by users.
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 2f3d3825475..f9e45ae5c19 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -150,11 +150,13 @@ public class KubernetesRuntime implements Runtime {
private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
private String functionInstanceClassPath;
private String downloadDirectory;
+ private final String kubernetesServiceDomainSuffix;
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
String jobNamespace,
String jobName,
+ String kubernetesServiceDomainSuffix,
Map<String, String> customLabels,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
@@ -193,6 +195,7 @@ public class KubernetesRuntime implements Runtime {
this.instanceConfig = instanceConfig;
this.jobNamespace = jobNamespace;
this.jobName = jobName;
+ this.kubernetesServiceDomainSuffix = kubernetesServiceDomainSuffix;
this.customLabels = customLabels;
this.functionDockerImages = functionDockerImages;
this.pulsarDockerImageName = pulsarDockerImageName;
@@ -315,9 +318,7 @@ public class KubernetesRuntime implements Runtime {
private synchronized void setupGrpcChannelIfNeeded() {
if (channel == null || stub == null) {
channel = new
ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
- stub = new
InstanceControlGrpc.InstanceControlStub[instanceConfig.getFunctionDetails()
- .getParallelism()];
-
+ stub = new
InstanceControlGrpc.InstanceControlStub[instanceConfig.getFunctionDetails().getParallelism()];
String jobName =
createJobName(instanceConfig.getFunctionDetails(), this.jobName);
for (int i = 0; i <
instanceConfig.getFunctionDetails().getParallelism(); ++i) {
String address = getServiceUrl(jobName, jobNamespace, i);
@@ -1152,11 +1153,11 @@ public class KubernetesRuntime implements Runtime {
final String shortHash =
DigestUtils.sha1Hex(jobNameBase).toLowerCase().substring(0, 8);
return convertedJobName + "-" + shortHash;
}
-
- private static String getServiceUrl(String jobName, String jobNamespace,
int instanceId) {
- return String.format("%s-%d.%s.%s.svc.cluster.local", jobName,
instanceId, jobName, jobNamespace);
+ @VisibleForTesting
+ String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
+ String suffix = isNotBlank(kubernetesServiceDomainSuffix) ?
kubernetesServiceDomainSuffix : "svc.cluster.local";
+ return String.format("%s-%d.%s.%s.%s", jobName, instanceId, jobName,
jobNamespace, suffix);
}
-
public static void doChecks(FunctionDetails functionDetails, String
overridenJobName) {
final String jobName = createJobName(functionDetails,
overridenJobName);
if (!jobName.equals(jobName.toLowerCase())) {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 8a44988ea87..1e846e101cd 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -101,6 +101,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
private String functionInstanceClassPath;
private String downloadDirectory;
private int gracePeriodSeconds;
+ private String kubernetesServiceDomainSuffix;
@ToString.Exclude
@EqualsAndHashCode.Exclude
@@ -178,7 +179,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
if (!Paths.get(this.downloadDirectory).isAbsolute()) {
this.downloadDirectory = this.pulsarRootDir + "/" +
this.downloadDirectory;
}
-
+ this.kubernetesServiceDomainSuffix =
factoryConfig.getKubernetesServiceDomainSuffix();
this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
this.installUserCodeDependencies =
factoryConfig.getInstallUserCodeDependencies();
this.pythonDependencyRepository =
factoryConfig.getPythonDependencyRepository();
@@ -318,6 +319,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
// get the namespace for this function
overriddenNamespace,
overriddenName,
+ kubernetesServiceDomainSuffix,
customLabels,
installUserCodeDependencies,
pythonDependencyRepository,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index 43cdc035076..ea196923497 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -47,7 +47,12 @@ public class KubernetesRuntimeFactoryConfig {
doc = "The docker image used to run function instance. By default it
is `apachepulsar/pulsar`"
)
protected String pulsarDockerImageName;
-
+ @FieldContext(
+ doc = "Optional domain suffix to use when the Function Worker
constructs the gRPC address "
+ + "to connect to function instances. If left blank, it defaults to
`.svc.cluster.local`. "
+ + "Set this if your Function Worker is outside the cluster and
connects via an external Gateway/Ingress."
+ )
+ protected String kubernetesServiceDomainSuffix;
@FieldContext(
doc = "The function docker images used to run function instance
according to different "
+ "configurations provided by users. By default it is
`apachepulsar/pulsar`"
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 252906557b6..9efa09a5093 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -343,6 +343,29 @@ public class KubernetesRuntimeTest {
return config;
}
+ @Test
+ public void testGetServiceUrl() throws Exception {
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
+
+ KubernetesRuntime container1 = factory.createContainer(
+ config, userJarFile, userJarFile, null, null, 30L);
+ assertEquals(container1.getServiceUrl("my-job", "my-namespace", 0),
+ "my-job-0.my-job.my-namespace.svc.cluster.local");
+
+ KubernetesRuntimeFactory factory2 =
createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+ java.lang.reflect.Field field =
KubernetesRuntimeFactory.class.getDeclaredField(
+ "kubernetesServiceDomainSuffix");
+ field.setAccessible(true);
+ field.set(factory2, "custom.gateway.internal");
+
+ KubernetesRuntime container2 = factory2.createContainer(
+ config, userJarFile, userJarFile, null, null, 30L);
+ assertEquals(container2.getServiceUrl("my-job", "my-namespace", 0),
+ "my-job-0.my-job.my-namespace.custom.gateway.internal");
+ }
+
+
@Test
public void testRamPadding() throws Exception {
verifyRamPadding(0, 1000, 1000);