This is an automated email from the ASF dual-hosted git repository. aldettinger pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 17c9fdcfed19b14b12cbde928cd88aad31bb50ff Author: aldettinger <aldettin...@gmail.com> AuthorDate: Wed Sep 14 15:15:27 2022 +0200 kubernetes: auto-configure kubernetes cluster service #4086 --- .../pages/reference/extensions/kubernetes.adoc | 100 +++++++++++++++ extensions/kubernetes/deployment/pom.xml | 5 + .../KubernetesClusterServiceProcessor.java | 51 ++++++++ .../KubernetesClusterServiceConfigDefaultTest.java | 74 +++++++++++ ...erServiceConfigEnabledWithRebalancingtTest.java | 76 ++++++++++++ ...terServiceConfigEnabledWithoutDefaultsTest.java | 114 +++++++++++++++++ ...ServiceConfigEnabledWithoutRebalancingTest.java | 99 +++++++++++++++ .../cluster/KubernetesClusterServiceConfig.java | 138 +++++++++++++++++++++ .../cluster/KubernetesClusterServiceRecorder.java | 60 +++++++++ 9 files changed, 717 insertions(+) diff --git a/docs/modules/ROOT/pages/reference/extensions/kubernetes.adoc b/docs/modules/ROOT/pages/reference/extensions/kubernetes.adoc index 525f4d3e34..0610850405 100644 --- a/docs/modules/ROOT/pages/reference/extensions/kubernetes.adoc +++ b/docs/modules/ROOT/pages/reference/extensions/kubernetes.adoc @@ -80,3 +80,103 @@ quarkus.kubernetes-client.namespace=my-namespace The full set of configuration options are documented in the https://quarkus.io/guides/kubernetes-client#quarkus-kubernetes-client_configuration[Quarkus Kubernetes Client guide]. + +[width="100%",cols="80,5,15",options="header"] +|=== +| Configuration property | Type | Default + + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.enabled]]`link:#quarkus.camel.cluster.kubernetes.enabled[quarkus.camel.cluster.kubernetes.enabled]` + +Whether a Kubernetes Cluster Service should be automatically configured according to 'quarkus.camel.cluster.kubernetes.++*++' configurations. +| `boolean` +| `false` + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.id]]`link:#quarkus.camel.cluster.kubernetes.id[quarkus.camel.cluster.kubernetes.id]` + +The cluster service ID (defaults to null). +| `string` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.master-url]]`link:#quarkus.camel.cluster.kubernetes.master-url[quarkus.camel.cluster.kubernetes.master-url]` + +The URL of the Kubernetes master (read from Kubernetes client properties by default). +| `string` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.connection-timeout-millis]]`link:#quarkus.camel.cluster.kubernetes.connection-timeout-millis[quarkus.camel.cluster.kubernetes.connection-timeout-millis]` + +The connection timeout in milliseconds to use when making requests to the Kubernetes API server. +| `java.lang.Integer` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.namespace]]`link:#quarkus.camel.cluster.kubernetes.namespace[quarkus.camel.cluster.kubernetes.namespace]` + +The name of the Kubernetes namespace containing the pods and the configmap (autodetected by default). +| `string` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.pod-name]]`link:#quarkus.camel.cluster.kubernetes.pod-name[quarkus.camel.cluster.kubernetes.pod-name]` + +The name of the current pod (autodetected from container host name by default). +| `string` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.jitter-factor]]`link:#quarkus.camel.cluster.kubernetes.jitter-factor[quarkus.camel.cluster.kubernetes.jitter-factor]` + +The jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant (defaults to 1.2). +| `java.lang.Double` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.lease-duration-millis]]`link:#quarkus.camel.cluster.kubernetes.lease-duration-millis[quarkus.camel.cluster.kubernetes.lease-duration-millis]` + +The default duration of the lease for the current leader (defaults to 15000). +| `java.lang.Long` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.renew-deadline-millis]]`link:#quarkus.camel.cluster.kubernetes.renew-deadline-millis[quarkus.camel.cluster.kubernetes.renew-deadline-millis]` + +The deadline after which the leader must stop its services because it may have lost the leadership (defaults to 10000). +| `java.lang.Long` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.retry-period-millis]]`link:#quarkus.camel.cluster.kubernetes.retry-period-millis[quarkus.camel.cluster.kubernetes.retry-period-millis]` + +The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor (defaults to 2000). +| `java.lang.Long` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.order]]`link:#quarkus.camel.cluster.kubernetes.order[quarkus.camel.cluster.kubernetes.order]` + +Service lookup order/priority (defaults to 2147482647). +| `java.lang.Integer` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.resource-name]]`link:#quarkus.camel.cluster.kubernetes.resource-name[quarkus.camel.cluster.kubernetes.resource-name]` + +The name of the lease resource used to do optimistic locking (defaults to 'leaders'). The resource name is used as prefix when the underlying Kubernetes resource can manage a single lock. +| `string` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.lease-resource-type]]`link:#quarkus.camel.cluster.kubernetes.lease-resource-type[quarkus.camel.cluster.kubernetes.lease-resource-type]` + +The lease resource type used in Kubernetes, either 'config-map' or 'lease' (defaults to 'lease'). +| `org.apache.camel.component.kubernetes.cluster.LeaseResourceType` +| + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.rebalancing]]`link:#quarkus.camel.cluster.kubernetes.rebalancing[quarkus.camel.cluster.kubernetes.rebalancing]` + +Whether the camel master namespace leaders should be distributed evenly across all the camel contexts in the cluster. +| `boolean` +| `true` + +|icon:lock[title=Fixed at build time] [[quarkus.camel.cluster.kubernetes.labels]]`link:#quarkus.camel.cluster.kubernetes.labels[quarkus.camel.cluster.kubernetes.labels]` + +The labels key/value used to identify the pods composing the cluster, defaults to empty map. +| ``Map<String,String>`` +| +|=== + +[.configuration-legend] +{doc-link-icon-lock}[title=Fixed at build time] Configuration property fixed at build time. All other configuration properties are overridable at runtime. + diff --git a/extensions/kubernetes/deployment/pom.xml b/extensions/kubernetes/deployment/pom.xml index d84473bfa0..0244440169 100644 --- a/extensions/kubernetes/deployment/pom.xml +++ b/extensions/kubernetes/deployment/pom.xml @@ -50,6 +50,11 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-kubernetes</artifactId> </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-junit5-internal</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/extensions/kubernetes/deployment/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceProcessor.java b/extensions/kubernetes/deployment/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceProcessor.java new file mode 100644 index 0000000000..ff3962e3f5 --- /dev/null +++ b/extensions/kubernetes/deployment/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceProcessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster.deployment; + +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Consume; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.runtime.RuntimeValue; +import org.apache.camel.component.kubernetes.cluster.KubernetesClusterService; +import org.apache.camel.quarkus.component.kubernetes.cluster.KubernetesClusterServiceConfig; +import org.apache.camel.quarkus.component.kubernetes.cluster.KubernetesClusterServiceRecorder; +import org.apache.camel.quarkus.core.deployment.spi.CamelBeanBuildItem; +import org.apache.camel.quarkus.core.deployment.spi.CamelContextBuildItem; +import org.apache.camel.support.cluster.RebalancingCamelClusterService; + +class KubernetesClusterServiceProcessor { + + @Record(ExecutionTime.STATIC_INIT) + @BuildStep(onlyIf = KubernetesClusterServiceConfig.Enabled.class) + @Consume(CamelContextBuildItem.class) + CamelBeanBuildItem setupKubernetesClusterService( + KubernetesClusterServiceConfig config, + KubernetesClusterServiceRecorder recorder) { + + if (config.rebalancing) { + final RuntimeValue<RebalancingCamelClusterService> krcs = recorder + .createKubernetesRebalancingClusterService(config); + return new CamelBeanBuildItem("kubernetesRebalancingClusterService", + RebalancingCamelClusterService.class.getName(), krcs); + } else { + final RuntimeValue<KubernetesClusterService> kcs = recorder.createKubernetesClusterService(config); + return new CamelBeanBuildItem("kubernetesClusterService", KubernetesClusterService.class.getName(), kcs); + } + } + +} diff --git a/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigDefaultTest.java b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigDefaultTest.java new file mode 100644 index 0000000000..21b77b529f --- /dev/null +++ b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigDefaultTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster.deployment; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Properties; + +import javax.inject.Inject; + +import io.quarkus.test.QuarkusUnitTest; +import org.apache.camel.CamelContext; +import org.apache.camel.component.kubernetes.cluster.KubernetesClusterService; +import org.apache.camel.impl.DefaultCamelContext; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.Asset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class KubernetesClusterServiceConfigDefaultTest { + @RegisterExtension + static final QuarkusUnitTest CONFIG = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addAsResource(applicationProperties(), + "application.properties")); + + public static final Asset applicationProperties() { + Writer writer = new StringWriter(); + + Properties props = new Properties(); + + try { + props.store(writer, ""); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new StringAsset(writer.toString()); + } + + @Inject + CamelContext camelContext; + + @Test + public void defaultConfigShouldNotAutoConfigure() { + + DefaultCamelContext dcc = camelContext.adapt(DefaultCamelContext.class); + assertNotNull(dcc); + + KubernetesClusterService[] kcss = dcc.getServices().stream().filter(s -> s instanceof KubernetesClusterService) + .toArray(KubernetesClusterService[]::new); + assertEquals(0, kcss.length); + } + +} diff --git a/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithRebalancingtTest.java b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithRebalancingtTest.java new file mode 100644 index 0000000000..bc3eb5f3f0 --- /dev/null +++ b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithRebalancingtTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster.deployment; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Properties; + +import javax.inject.Inject; + +import io.quarkus.test.QuarkusUnitTest; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.cluster.RebalancingCamelClusterService; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.Asset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class KubernetesClusterServiceConfigEnabledWithRebalancingtTest { + @RegisterExtension + static final QuarkusUnitTest CONFIG = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addAsResource(applicationProperties(), + "application.properties")); + + public static final Asset applicationProperties() { + Writer writer = new StringWriter(); + + Properties props = new Properties(); + props.setProperty("quarkus.camel.cluster.kubernetes.enabled", "true"); + + try { + props.store(writer, ""); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new StringAsset(writer.toString()); + } + + @Inject + CamelContext camelContext; + + @Test + public void enabledConfigWithRebalancingShouldAutoConfigure() { + + DefaultCamelContext dcc = camelContext.adapt(DefaultCamelContext.class); + assertNotNull(dcc); + + RebalancingCamelClusterService[] rcss = dcc.getServices().stream() + .filter(s -> s instanceof RebalancingCamelClusterService) + .toArray(RebalancingCamelClusterService[]::new); + assertEquals(1, rcss.length); + } + +} diff --git a/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithoutDefaultsTest.java b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithoutDefaultsTest.java new file mode 100644 index 0000000000..ec2ca08e80 --- /dev/null +++ b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithoutDefaultsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster.deployment; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Properties; + +import javax.inject.Inject; + +import io.quarkus.test.QuarkusUnitTest; +import org.apache.camel.CamelContext; +import org.apache.camel.component.kubernetes.cluster.KubernetesClusterService; +import org.apache.camel.component.kubernetes.cluster.LeaseResourceType; +import org.apache.camel.impl.DefaultCamelContext; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.Asset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KubernetesClusterServiceConfigEnabledWithoutDefaultsTest { + @RegisterExtension + static final QuarkusUnitTest CONFIG = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addAsResource(applicationProperties(), + "application.properties")); + + public static final Asset applicationProperties() { + Writer writer = new StringWriter(); + + Properties props = new Properties(); + props.setProperty("quarkus.camel.cluster.kubernetes.enabled", "true"); + props.setProperty("quarkus.camel.cluster.kubernetes.id", "kcs-id"); + props.setProperty("quarkus.camel.cluster.kubernetes.master-url", "kcs-master-url"); + props.setProperty("quarkus.camel.cluster.kubernetes.connection-timeout-millis", "5033"); + props.setProperty("quarkus.camel.cluster.kubernetes.namespace", "kcs-namespace"); + props.setProperty("quarkus.camel.cluster.kubernetes.pod-name", "kcs-pod-name"); + props.setProperty("quarkus.camel.cluster.kubernetes.jitter-factor", "1.5034"); + props.setProperty("quarkus.camel.cluster.kubernetes.lease-duration-millis", "5036"); + props.setProperty("quarkus.camel.cluster.kubernetes.renew-deadline-millis", "5037"); + props.setProperty("quarkus.camel.cluster.kubernetes.retry-period-millis", "5038"); + props.setProperty("quarkus.camel.cluster.kubernetes.order", "5039"); + props.setProperty("quarkus.camel.cluster.kubernetes.resource-name", "kcs-resource-name"); + props.setProperty("quarkus.camel.cluster.kubernetes.lease-resource-type", "config-map"); + props.setProperty("quarkus.camel.cluster.kubernetes.rebalancing", "false"); + props.setProperty("quarkus.camel.cluster.kubernetes.labels.key1", "value1"); + props.setProperty("quarkus.camel.cluster.kubernetes.labels.key2", "value2"); + + try { + props.store(writer, ""); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new StringAsset(writer.toString()); + } + + @Inject + CamelContext camelContext; + + @Test + public void enabledConfigWithoutDefaultsShouldAutoConfigure() { + + DefaultCamelContext dcc = camelContext.adapt(DefaultCamelContext.class); + assertNotNull(dcc); + + KubernetesClusterService[] kcss = dcc.getServices().stream().filter(s -> s instanceof KubernetesClusterService) + .toArray(KubernetesClusterService[]::new); + assertEquals(1, kcss.length); + + KubernetesClusterService kcs = kcss[0]; + assertNotNull(kcs); + + assertEquals("kcs-id", kcs.getId()); + assertEquals("kcs-master-url", kcs.getMasterUrl()); + assertEquals(5033, kcs.getConnectionTimeoutMillis()); + assertEquals("kcs-namespace", kcs.getKubernetesNamespace()); + assertEquals("kcs-pod-name", kcs.getPodName()); + assertEquals(1.5034, kcs.getJitterFactor()); + assertEquals(5036, kcs.getLeaseDurationMillis()); + assertEquals(5037, kcs.getRenewDeadlineMillis()); + assertEquals(5038, kcs.getRetryPeriodMillis()); + assertEquals(5039, kcs.getOrder()); + assertEquals("kcs-resource-name", kcs.getKubernetesResourceName()); + assertEquals(LeaseResourceType.ConfigMap, kcs.getLeaseResourceType()); + + assertNotNull(kcs.getClusterLabels()); + assertTrue(kcs.getClusterLabels().containsKey("key1")); + assertEquals("value1", kcs.getClusterLabels().get("key1")); + assertTrue(kcs.getClusterLabels().containsKey("key2")); + assertEquals("value2", kcs.getClusterLabels().get("key2")); + } + +} diff --git a/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithoutRebalancingTest.java b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithoutRebalancingTest.java new file mode 100644 index 0000000000..7b4dfaf7c1 --- /dev/null +++ b/extensions/kubernetes/deployment/src/test/java/org/apache/camel/quarkus/component/kubernetes/cluster/deployment/KubernetesClusterServiceConfigEnabledWithoutRebalancingTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster.deployment; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Properties; + +import javax.inject.Inject; + +import io.quarkus.test.QuarkusUnitTest; +import org.apache.camel.CamelContext; +import org.apache.camel.Ordered; +import org.apache.camel.component.kubernetes.cluster.KubernetesClusterService; +import org.apache.camel.component.kubernetes.cluster.LeaseResourceType; +import org.apache.camel.impl.DefaultCamelContext; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.Asset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KubernetesClusterServiceConfigEnabledWithoutRebalancingTest { + @RegisterExtension + static final QuarkusUnitTest CONFIG = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addAsResource(applicationProperties(), + "application.properties")); + + public static final Asset applicationProperties() { + Writer writer = new StringWriter(); + + Properties props = new Properties(); + props.setProperty("quarkus.camel.cluster.kubernetes.enabled", "true"); + props.setProperty("quarkus.camel.cluster.kubernetes.rebalancing", "false"); + + try { + props.store(writer, ""); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new StringAsset(writer.toString()); + } + + @Inject + CamelContext camelContext; + + @Test + public void enabledConfigWithoutRebalancingAndDefaultsShouldAutoConfigure() { + + DefaultCamelContext dcc = camelContext.adapt(DefaultCamelContext.class); + assertNotNull(dcc); + + KubernetesClusterService[] kcss = dcc.getServices().stream().filter(s -> s instanceof KubernetesClusterService) + .toArray(KubernetesClusterService[]::new); + assertEquals(1, kcss.length); + + KubernetesClusterService kcs = kcss[0]; + assertNotNull(kcs); + + assertNull(kcs.getId()); + assertNull(kcs.getMasterUrl()); + assertNull(kcs.getConnectionTimeoutMillis()); + assertNull(kcs.getKubernetesNamespace()); + assertNull(kcs.getPodName()); + assertEquals(1.2, kcs.getJitterFactor()); + assertEquals(15000L, kcs.getLeaseDurationMillis()); + assertEquals(10000L, kcs.getRenewDeadlineMillis()); + assertEquals(2000L, kcs.getRetryPeriodMillis()); + assertEquals(Ordered.LOWEST, kcs.getOrder()); + assertEquals("leaders", kcs.getKubernetesResourceName()); + assertEquals(LeaseResourceType.Lease, kcs.getLeaseResourceType()); + + assertNotNull(kcs.getClusterLabels()); + assertTrue(kcs.getClusterLabels().isEmpty()); + } + +} diff --git a/extensions/kubernetes/runtime/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/KubernetesClusterServiceConfig.java b/extensions/kubernetes/runtime/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/KubernetesClusterServiceConfig.java new file mode 100644 index 0000000000..c95c42f801 --- /dev/null +++ b/extensions/kubernetes/runtime/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/KubernetesClusterServiceConfig.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster; + +import java.util.Map; +import java.util.Optional; +import java.util.function.BooleanSupplier; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigRoot; +import org.apache.camel.component.kubernetes.cluster.LeaseResourceType; +import org.eclipse.microprofile.config.ConfigProvider; + +@ConfigRoot(name = "camel.cluster.kubernetes") +public class KubernetesClusterServiceConfig { + + /** + * Whether a Kubernetes Cluster Service should be automatically configured + * according to 'quarkus.camel.cluster.kubernetes.*' configurations. + */ + @ConfigItem(defaultValue = "false") + public boolean enabled; + + /** + * The cluster service ID (defaults to null). + */ + @ConfigItem + public Optional<String> id; + + /** + * The URL of the Kubernetes master (read from Kubernetes client properties by default). + */ + @ConfigItem + public Optional<String> masterUrl; + + /** + * The connection timeout in milliseconds to use when making requests to the Kubernetes API server. + */ + @ConfigItem + public Optional<Integer> connectionTimeoutMillis; + + /** + * The name of the Kubernetes namespace containing the pods and the configmap (autodetected by default). + */ + @ConfigItem + public Optional<String> namespace; + + /** + * The name of the current pod (autodetected from container host name by default). + */ + @ConfigItem + public Optional<String> podName; + + /** + * The jitter factor to apply in order to prevent all pods to call Kubernetes APIs in the same instant (defaults to + * 1.2). + */ + @ConfigItem + public Optional<Double> jitterFactor; + + /** + * The default duration of the lease for the current leader (defaults to 15000). + */ + @ConfigItem + public Optional<Long> leaseDurationMillis; + + /** + * The deadline after which the leader must stop its services because it may have lost the leadership (defaults to + * 10000). + */ + @ConfigItem + public Optional<Long> renewDeadlineMillis; + + /** + * The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter + * factor (defaults to 2000). + */ + @ConfigItem + public Optional<Long> retryPeriodMillis; + + /** + * Service lookup order/priority (defaults to 2147482647). + */ + @ConfigItem + public Optional<Integer> order; + + /** + * The name of the lease resource used to do optimistic locking (defaults to + * 'leaders'). The resource name is used as prefix when the underlying + * Kubernetes resource can manage a single lock. + */ + @ConfigItem + public Optional<String> resourceName; + + /** + * The lease resource type used in Kubernetes, either 'config-map' or + * 'lease' (defaults to 'lease'). + */ + @ConfigItem + public Optional<LeaseResourceType> leaseResourceType; + + /** + * Whether the camel master namespace leaders should be distributed evenly + * across all the camel contexts in the cluster. + */ + @ConfigItem(defaultValue = "true") + public boolean rebalancing; + + /** + * The labels key/value used to identify the pods composing the cluster, + * defaults to empty map. + */ + @ConfigItem + public Map<String, String> labels; + + public static final class Enabled implements BooleanSupplier { + @Override + public boolean getAsBoolean() { + return ConfigProvider.getConfig().getOptionalValue("quarkus.camel.cluster.kubernetes.enabled", Boolean.class) + .orElse(Boolean.FALSE); + } + } + +} diff --git a/extensions/kubernetes/runtime/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/KubernetesClusterServiceRecorder.java b/extensions/kubernetes/runtime/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/KubernetesClusterServiceRecorder.java new file mode 100644 index 0000000000..611abe87f8 --- /dev/null +++ b/extensions/kubernetes/runtime/src/main/java/org/apache/camel/quarkus/component/kubernetes/cluster/KubernetesClusterServiceRecorder.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kubernetes.cluster; + +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.annotations.Recorder; +import org.apache.camel.component.kubernetes.cluster.KubernetesClusterService; +import org.apache.camel.support.cluster.RebalancingCamelClusterService; + +@Recorder +public class KubernetesClusterServiceRecorder { + + public RuntimeValue<KubernetesClusterService> createKubernetesClusterService(KubernetesClusterServiceConfig config) { + KubernetesClusterService kcs = setupKubernetesClusterServiceFromConfig(config); + return new RuntimeValue<KubernetesClusterService>(kcs); + } + + public RuntimeValue<RebalancingCamelClusterService> createKubernetesRebalancingClusterService( + KubernetesClusterServiceConfig config) { + KubernetesClusterService kcs = setupKubernetesClusterServiceFromConfig(config); + RebalancingCamelClusterService rebalancingService = new RebalancingCamelClusterService(kcs, + kcs.getRenewDeadlineMillis()); + return new RuntimeValue<RebalancingCamelClusterService>(rebalancingService); + } + + private KubernetesClusterService setupKubernetesClusterServiceFromConfig(KubernetesClusterServiceConfig config) { + KubernetesClusterService clusterService = new KubernetesClusterService(); + + config.id.ifPresent(id -> clusterService.setId(id)); + config.masterUrl.ifPresent(url -> clusterService.setMasterUrl(url)); + config.connectionTimeoutMillis.ifPresent(ctm -> clusterService.setConnectionTimeoutMillis(ctm)); + config.namespace.ifPresent(ns -> clusterService.setKubernetesNamespace(ns)); + config.podName.ifPresent(pn -> clusterService.setPodName(pn)); + config.jitterFactor.ifPresent(jf -> clusterService.setJitterFactor(jf)); + config.leaseDurationMillis.ifPresent(ldm -> clusterService.setLeaseDurationMillis(ldm)); + config.renewDeadlineMillis.ifPresent(rdm -> clusterService.setRenewDeadlineMillis(rdm)); + config.retryPeriodMillis.ifPresent(rpm -> clusterService.setRetryPeriodMillis(rpm)); + config.order.ifPresent(o -> clusterService.setOrder(o)); + config.resourceName.ifPresent(krn -> clusterService.setKubernetesResourceName(krn)); + config.leaseResourceType.ifPresent(lrt -> clusterService.setLeaseResourceType(lrt)); + + clusterService.setClusterLabels(config.labels); + + return clusterService; + } +}