This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch fix/k8s-coordinator-self-endpoint-race in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 3127f1df2e7f6392494a42f96d07784841fb925f Author: Wu Sheng <[email protected]> AuthorDate: Wed Mar 11 17:50:56 2026 +0800 Fix K8s coordinator self-endpoint race condition (#13739) Include self pod in the DynamicEndpointGroup endpoint list so that when the informer syncs self after initial startup, the list actually changes and the listener re-fires with the correct self IP. Previously self was excluded from the endpoint list, so DynamicEndpointGroup deduplicated and never re-notified the listener, leaving self permanently stuck at 127.0.0.1. --- docs/en/changes/changes.md | 1 + .../plugin/kubernetes/KubernetesCoordinator.java | 32 ++-- .../KubernetesLabelSelectorEndpointGroup.java | 10 +- .../plugin/kubernetes/SelfEndpointAccessor.java | 29 +++ .../kubernetes/KubernetesCoordinatorTest.java | 210 +++++++++++++++++++++ 5 files changed, 268 insertions(+), 14 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 50ba551a53..a8465dc26a 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -95,6 +95,7 @@ #### OAP Server * KubernetesCoordinator: make self instance return real pod IP address instead of `127.0.0.1`. +* Fix KubernetesCoordinator self-endpoint race condition: include self in the endpoint list so DynamicEndpointGroup re-fires the listener when the self pod appears in the informer after initial sync. * Enhance the alarm kernel with recovered status notification capability * Fix BrowserWebVitalsPerfData `clsTime` to `cls` and make it double type. * Init `log-mal-rules` at module provider start stage to avoid re-init for every LAL. diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java index e2d9ec6b29..d3871deb61 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java @@ -68,7 +68,7 @@ public class KubernetesCoordinator extends ClusterCoordinator { } } - private EndpointGroup createEndpointGroup() { + EndpointGroup createEndpointGroup() { if (port == -1) { port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort(); } @@ -148,22 +148,28 @@ public class KubernetesCoordinator extends ClusterCoordinator { log.debug("[kubernetes cluster endpoints]: {}", endpoints); } + // The endpoint group now includes self in the list. + // Identify self by matching against getSelfEndpoint(). + Endpoint selfEndpoint = null; + if (endpointGroup instanceof SelfEndpointAccessor) { + selfEndpoint = ((SelfEndpointAccessor) endpointGroup).getSelfEndpoint(); + } + + final Endpoint finalSelfEndpoint = selfEndpoint; final var instances = endpoints.stream() - .map(endpoint -> new RemoteInstance(new Address(endpoint.host(), endpoint.port(), false))) + .map(endpoint -> { + final boolean isSelf = finalSelfEndpoint != null + && endpoint.host().equals(finalSelfEndpoint.host()) + && endpoint.port() == finalSelfEndpoint.port(); + return new RemoteInstance(new Address(endpoint.host(), endpoint.port(), isSelf)); + }) .collect(Collectors.toList()); - // The endpoint group will never include itself, add it. - Endpoint selfEndpoint = null; - if (endpointGroup instanceof KubernetesLabelSelectorEndpointGroup) { - selfEndpoint = ((KubernetesLabelSelectorEndpointGroup) endpointGroup).getSelfEndpoint(); - } - final RemoteInstance selfInstance; - if (selfEndpoint == null) { - selfInstance = new RemoteInstance(new Address("127.0.0.1", port, true)); - } else { - selfInstance = new RemoteInstance(new Address(selfEndpoint.host(), selfEndpoint.port(), true)); + // If self was not found in the endpoint list (informer hasn't synced self yet), + // add a fallback self instance so the cluster always has a self node. + if (instances.stream().noneMatch(i -> i.getAddress().isSelf())) { + instances.add(new RemoteInstance(new Address("127.0.0.1", port, true))); } - instances.add(selfInstance); if (log.isDebugEnabled()) { instances.forEach(instance -> log.debug("kubernetes cluster instance: {}", instance)); diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java index 63f6f9f411..5744bbd009 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java @@ -41,7 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @Slf4j -public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup { +public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup implements SelfEndpointAccessor { private final KubernetesClient kubernetesClient; private final String namespace; @@ -108,6 +108,14 @@ public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup { Endpoint endpoint = createEndpoint(pod); if (endpoint != null) { selfEndpoint = endpoint; + // Include self in the endpoint list so that when self pod appears + // in the informer (after initial sync), the endpoint list changes + // and DynamicEndpointGroup fires the listener again. + // Previously self was excluded here, so the endpoint list stayed + // the same when self appeared — DynamicEndpointGroup deduplicated + // and skipped listener notification, leaving the coordinator stuck + // with the 127.0.0.1 fallback forever. See #13739. + newEndpoints.add(endpoint); } continue; } diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/SelfEndpointAccessor.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/SelfEndpointAccessor.java new file mode 100644 index 0000000000..8f421d2a19 --- /dev/null +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/SelfEndpointAccessor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.cluster.plugin.kubernetes; + +import com.linecorp.armeria.client.Endpoint; + +/** + * Provides access to the self endpoint for the Kubernetes coordinator. + * Implemented by {@link KubernetesLabelSelectorEndpointGroup} and test doubles. + */ +interface SelfEndpointAccessor { + Endpoint getSelfEndpoint(); +} diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java new file mode 100644 index 0000000000..3601c4b18e --- /dev/null +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.cluster.plugin.kubernetes; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; +import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics; +import org.apache.skywalking.oap.server.testing.util.ReflectUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +/** + * Tests for {@link KubernetesCoordinator} listener behavior, especially the race condition + * where the K8s informer hasn't synced the self pod when the first endpoint update fires. + * See <a href="https://github.com/apache/skywalking/issues/13739">#13739</a>. + */ +@ExtendWith(MockitoExtension.class) +public class KubernetesCoordinatorTest { + + private static final int GRPC_PORT = 11800; + private static final String REMOTE_POD_IP = "10.116.2.203"; + private static final String SELF_POD_IP = "10.116.2.100"; + + @Mock + private HealthCheckMetrics healthChecker; + + private KubernetesCoordinator coordinator; + private TestEndpointGroup testEndpointGroup; + + @BeforeEach + public void setUp() throws Exception { + final var config = new ClusterModuleKubernetesConfig(); + config.setLabelSelector("app=oap"); + + testEndpointGroup = new TestEndpointGroup(); + + coordinator = spy(new KubernetesCoordinator(null, config)); + ReflectUtil.setInternalState(coordinator, "port", GRPC_PORT); + ReflectUtil.setInternalState(coordinator, "healthChecker", healthChecker); + doReturn(testEndpointGroup).when(coordinator).createEndpointGroup(); + + coordinator.start(); + } + + /** + * Simulates the race condition: informer has NOT synced the self pod yet. + * Only the remote pod is in the endpoint list, and getSelfEndpoint() returns null. + * The coordinator should fall back to 127.0.0.1 for self. + */ + @Test + public void shouldFallbackTo127WhenSelfNotSynced() { + // selfEndpoint is null (not synced), only remote pod in the list + testEndpointGroup.fireEndpoints( + List.of(Endpoint.of(REMOTE_POD_IP, GRPC_PORT))); + + final List<RemoteInstance> instances = coordinator.queryRemoteNodes(); + assertEquals(2, instances.size()); + + final RemoteInstance remote = findByHost(instances, REMOTE_POD_IP); + assertFalse(remote.getAddress().isSelf()); + + final RemoteInstance self = findByHost(instances, "127.0.0.1"); + assertTrue(self.getAddress().isSelf()); + } + + /** + * Simulates the fix for #13739: after the informer syncs the self pod, + * the endpoint list now includes self (list changes from 1 to 2 entries), + * so DynamicEndpointGroup fires the listener again. This time + * getSelfEndpoint() returns the real IP and self is correctly identified. + * + * Previously, self was excluded from the endpoint list, so the list + * didn't change when self appeared — DynamicEndpointGroup deduplicated + * and never re-fired the listener, leaving self stuck at 127.0.0.1. + */ + @Test + public void shouldResolveSelfAfterInformerSync() { + // Phase 1: informer hasn't synced self yet + testEndpointGroup.fireEndpoints( + List.of(Endpoint.of(REMOTE_POD_IP, GRPC_PORT))); + + List<RemoteInstance> instances = coordinator.queryRemoteNodes(); + assertEquals(2, instances.size()); + assertTrue(findByHost(instances, "127.0.0.1").getAddress().isSelf(), + "Before sync, self should be 127.0.0.1 fallback"); + + // Phase 2: informer syncs self pod — endpoint list now includes both pods + final Endpoint selfEndpoint = Endpoint.of(SELF_POD_IP, GRPC_PORT); + testEndpointGroup.setSelfEndpoint(selfEndpoint); + testEndpointGroup.fireEndpoints(List.of( + Endpoint.of(REMOTE_POD_IP, GRPC_PORT), + selfEndpoint + )); + + instances = coordinator.queryRemoteNodes(); + assertEquals(2, instances.size()); + + final RemoteInstance self = findByHost(instances, SELF_POD_IP); + assertTrue(self.getAddress().isSelf(), + "After sync, self should use real pod IP " + SELF_POD_IP); + + final RemoteInstance remote = findByHost(instances, REMOTE_POD_IP); + assertFalse(remote.getAddress().isSelf()); + + assertTrue(instances.stream().noneMatch( + i -> i.getAddress().getHost().equals("127.0.0.1")), + "127.0.0.1 fallback should be gone after self is resolved"); + } + + /** + * Verifies that TTL leader election works correctly after self is resolved. + * Sorted instance list should have deterministic ordering with real IPs, + * and exactly one node should be self. + */ + @Test + public void shouldElectTTLLeaderCorrectlyWithRealIPs() { + final Endpoint selfEndpoint = Endpoint.of(SELF_POD_IP, GRPC_PORT); + testEndpointGroup.setSelfEndpoint(selfEndpoint); + + testEndpointGroup.fireEndpoints(List.of( + Endpoint.of(REMOTE_POD_IP, GRPC_PORT), + selfEndpoint + )); + + final List<RemoteInstance> instances = coordinator.queryRemoteNodes(); + Collections.sort(instances); + + assertEquals(2, instances.size()); + + final long selfCount = instances.stream() + .filter(i -> i.getAddress().isSelf()) + .count(); + assertEquals(1, selfCount, "Exactly one instance should be self"); + + final RemoteInstance selfInstance = instances.stream() + .filter(i -> i.getAddress().isSelf()) + .findFirst() + .orElseThrow(); + assertEquals(SELF_POD_IP, selfInstance.getAddress().getHost()); + } + + private RemoteInstance findByHost(List<RemoteInstance> instances, String host) { + return instances.stream() + .filter(i -> i.getAddress().getHost().equals(host)) + .findFirst() + .orElseThrow(() -> new AssertionError( + "No instance with host: " + host + ", found: " + instances)); + } + + /** + * A controllable endpoint group for testing. Implements {@link SelfEndpointAccessor} + * so the coordinator can resolve self endpoint without requiring a real K8s client. + * + * Simulates the K8s informer behavior: + * - Initially selfEndpoint is null (informer hasn't synced self pod yet) + * - After calling {@link #setSelfEndpoint}, the next {@link #fireEndpoints} call + * simulates the informer adding the self pod to its cache + */ + static class TestEndpointGroup extends DynamicEndpointGroup implements SelfEndpointAccessor { + private volatile Endpoint selfEndpoint; + + @Override + public Endpoint getSelfEndpoint() { + return selfEndpoint; + } + + void setSelfEndpoint(Endpoint selfEndpoint) { + this.selfEndpoint = selfEndpoint; + } + + void fireEndpoints(List<Endpoint> endpoints) { + setEndpoints(endpoints); + } + + @Override + protected void doCloseAsync(CompletableFuture<?> future) { + future.complete(null); + } + } +}
