This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch fix/k8s-coordinator-self-endpoint-race
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 3127f1df2e7f6392494a42f96d07784841fb925f
Author: Wu Sheng <[email protected]>
AuthorDate: Wed Mar 11 17:50:56 2026 +0800

    Fix K8s coordinator self-endpoint race condition (#13739)
    
    Include self pod in the DynamicEndpointGroup endpoint list so that
    when the informer syncs self after initial startup, the list actually
    changes and the listener re-fires with the correct self IP.
    
    Previously self was excluded from the endpoint list, so DynamicEndpointGroup
    deduplicated and never re-notified the listener, leaving self permanently
    stuck at 127.0.0.1.
---
 docs/en/changes/changes.md                         |   1 +
 .../plugin/kubernetes/KubernetesCoordinator.java   |  32 ++--
 .../KubernetesLabelSelectorEndpointGroup.java      |  10 +-
 .../plugin/kubernetes/SelfEndpointAccessor.java    |  29 +++
 .../kubernetes/KubernetesCoordinatorTest.java      | 210 +++++++++++++++++++++
 5 files changed, 268 insertions(+), 14 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 50ba551a53..a8465dc26a 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -95,6 +95,7 @@
 #### OAP Server
 
 * KubernetesCoordinator: make self instance return real pod IP address instead 
of `127.0.0.1`.
+* Fix KubernetesCoordinator self-endpoint race condition: include self in the 
endpoint list so DynamicEndpointGroup re-fires the listener when the self pod 
appears in the informer after initial sync.
 * Enhance the alarm kernel with recovered status notification capability
 * Fix BrowserWebVitalsPerfData `clsTime` to `cls` and make it double type.
 * Init `log-mal-rules` at module provider start stage to avoid re-init for 
every LAL.
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index e2d9ec6b29..d3871deb61 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -68,7 +68,7 @@ public class KubernetesCoordinator extends ClusterCoordinator 
{
         }
     }
 
-    private EndpointGroup createEndpointGroup() {
+    EndpointGroup createEndpointGroup() {
         if (port == -1) {
             port = 
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
         }
@@ -148,22 +148,28 @@ public class KubernetesCoordinator extends 
ClusterCoordinator {
                 log.debug("[kubernetes cluster endpoints]: {}", endpoints);
             }
 
+            // The endpoint group now includes self in the list.
+            // Identify self by matching against getSelfEndpoint().
+            Endpoint selfEndpoint = null;
+            if (endpointGroup instanceof SelfEndpointAccessor) {
+                selfEndpoint = ((SelfEndpointAccessor) 
endpointGroup).getSelfEndpoint();
+            }
+
+            final Endpoint finalSelfEndpoint = selfEndpoint;
             final var instances = endpoints.stream()
-                    .map(endpoint -> new RemoteInstance(new 
Address(endpoint.host(), endpoint.port(), false)))
+                    .map(endpoint -> {
+                        final boolean isSelf = finalSelfEndpoint != null
+                            && endpoint.host().equals(finalSelfEndpoint.host())
+                            && endpoint.port() == finalSelfEndpoint.port();
+                        return new RemoteInstance(new Address(endpoint.host(), 
endpoint.port(), isSelf));
+                    })
                     .collect(Collectors.toList());
 
-            // The endpoint group will never include itself, add it.
-            Endpoint selfEndpoint = null;
-            if (endpointGroup instanceof KubernetesLabelSelectorEndpointGroup) 
{
-                selfEndpoint = ((KubernetesLabelSelectorEndpointGroup) 
endpointGroup).getSelfEndpoint();
-            }
-            final RemoteInstance selfInstance;
-            if (selfEndpoint == null) {
-                selfInstance = new RemoteInstance(new Address("127.0.0.1", 
port, true));
-            } else {
-                selfInstance = new RemoteInstance(new 
Address(selfEndpoint.host(), selfEndpoint.port(), true));
+            // If self was not found in the endpoint list (informer hasn't 
synced self yet),
+            // add a fallback self instance so the cluster always has a self 
node.
+            if (instances.stream().noneMatch(i -> i.getAddress().isSelf())) {
+                instances.add(new RemoteInstance(new Address("127.0.0.1", 
port, true)));
             }
-            instances.add(selfInstance);
 
             if (log.isDebugEnabled()) {
                 instances.forEach(instance -> log.debug("kubernetes cluster 
instance: {}", instance));
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
index 63f6f9f411..5744bbd009 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
@@ -41,7 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
-public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup 
{
+public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup 
implements SelfEndpointAccessor {
 
     private final KubernetesClient kubernetesClient;
     private final String namespace;
@@ -108,6 +108,14 @@ public class KubernetesLabelSelectorEndpointGroup extends 
DynamicEndpointGroup {
                     Endpoint endpoint = createEndpoint(pod);
                     if (endpoint != null) {
                         selfEndpoint = endpoint;
+                        // Include self in the endpoint list so that when self 
pod appears
+                        // in the informer (after initial sync), the endpoint 
list changes
+                        // and DynamicEndpointGroup fires the listener again.
+                        // Previously self was excluded here, so the endpoint 
list stayed
+                        // the same when self appeared — DynamicEndpointGroup 
deduplicated
+                        // and skipped listener notification, leaving the 
coordinator stuck
+                        // with the 127.0.0.1 fallback forever. See #13739.
+                        newEndpoints.add(endpoint);
                     }
                     continue;
                 }
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/SelfEndpointAccessor.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/SelfEndpointAccessor.java
new file mode 100644
index 0000000000..8f421d2a19
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/SelfEndpointAccessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import com.linecorp.armeria.client.Endpoint;
+
+/**
+ * Provides access to the self endpoint for the Kubernetes coordinator.
+ * Implemented by {@link KubernetesLabelSelectorEndpointGroup} and test 
doubles.
+ */
+interface SelfEndpointAccessor {
+    Endpoint getSelfEndpoint();
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
new file mode 100644
index 0000000000..3601c4b18e
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import com.linecorp.armeria.client.Endpoint;
+import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
+import org.apache.skywalking.oap.server.testing.util.ReflectUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Tests for {@link KubernetesCoordinator} listener behavior, especially the 
race condition
+ * where the K8s informer hasn't synced the self pod when the first endpoint 
update fires.
+ * See <a href="https://github.com/apache/skywalking/issues/13739";>#13739</a>.
+ */
+@ExtendWith(MockitoExtension.class)
+public class KubernetesCoordinatorTest {
+
+    private static final int GRPC_PORT = 11800;
+    private static final String REMOTE_POD_IP = "10.116.2.203";
+    private static final String SELF_POD_IP = "10.116.2.100";
+
+    @Mock
+    private HealthCheckMetrics healthChecker;
+
+    private KubernetesCoordinator coordinator;
+    private TestEndpointGroup testEndpointGroup;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        final var config = new ClusterModuleKubernetesConfig();
+        config.setLabelSelector("app=oap");
+
+        testEndpointGroup = new TestEndpointGroup();
+
+        coordinator = spy(new KubernetesCoordinator(null, config));
+        ReflectUtil.setInternalState(coordinator, "port", GRPC_PORT);
+        ReflectUtil.setInternalState(coordinator, "healthChecker", 
healthChecker);
+        doReturn(testEndpointGroup).when(coordinator).createEndpointGroup();
+
+        coordinator.start();
+    }
+
+    /**
+     * Simulates the race condition: informer has NOT synced the self pod yet.
+     * Only the remote pod is in the endpoint list, and getSelfEndpoint() 
returns null.
+     * The coordinator should fall back to 127.0.0.1 for self.
+     */
+    @Test
+    public void shouldFallbackTo127WhenSelfNotSynced() {
+        // selfEndpoint is null (not synced), only remote pod in the list
+        testEndpointGroup.fireEndpoints(
+            List.of(Endpoint.of(REMOTE_POD_IP, GRPC_PORT)));
+
+        final List<RemoteInstance> instances = coordinator.queryRemoteNodes();
+        assertEquals(2, instances.size());
+
+        final RemoteInstance remote = findByHost(instances, REMOTE_POD_IP);
+        assertFalse(remote.getAddress().isSelf());
+
+        final RemoteInstance self = findByHost(instances, "127.0.0.1");
+        assertTrue(self.getAddress().isSelf());
+    }
+
+    /**
+     * Simulates the fix for #13739: after the informer syncs the self pod,
+     * the endpoint list now includes self (list changes from 1 to 2 entries),
+     * so DynamicEndpointGroup fires the listener again. This time
+     * getSelfEndpoint() returns the real IP and self is correctly identified.
+     *
+     * Previously, self was excluded from the endpoint list, so the list
+     * didn't change when self appeared — DynamicEndpointGroup deduplicated
+     * and never re-fired the listener, leaving self stuck at 127.0.0.1.
+     */
+    @Test
+    public void shouldResolveSelfAfterInformerSync() {
+        // Phase 1: informer hasn't synced self yet
+        testEndpointGroup.fireEndpoints(
+            List.of(Endpoint.of(REMOTE_POD_IP, GRPC_PORT)));
+
+        List<RemoteInstance> instances = coordinator.queryRemoteNodes();
+        assertEquals(2, instances.size());
+        assertTrue(findByHost(instances, "127.0.0.1").getAddress().isSelf(),
+            "Before sync, self should be 127.0.0.1 fallback");
+
+        // Phase 2: informer syncs self pod — endpoint list now includes both 
pods
+        final Endpoint selfEndpoint = Endpoint.of(SELF_POD_IP, GRPC_PORT);
+        testEndpointGroup.setSelfEndpoint(selfEndpoint);
+        testEndpointGroup.fireEndpoints(List.of(
+            Endpoint.of(REMOTE_POD_IP, GRPC_PORT),
+            selfEndpoint
+        ));
+
+        instances = coordinator.queryRemoteNodes();
+        assertEquals(2, instances.size());
+
+        final RemoteInstance self = findByHost(instances, SELF_POD_IP);
+        assertTrue(self.getAddress().isSelf(),
+            "After sync, self should use real pod IP " + SELF_POD_IP);
+
+        final RemoteInstance remote = findByHost(instances, REMOTE_POD_IP);
+        assertFalse(remote.getAddress().isSelf());
+
+        assertTrue(instances.stream().noneMatch(
+            i -> i.getAddress().getHost().equals("127.0.0.1")),
+            "127.0.0.1 fallback should be gone after self is resolved");
+    }
+
+    /**
+     * Verifies that TTL leader election works correctly after self is 
resolved.
+     * Sorted instance list should have deterministic ordering with real IPs,
+     * and exactly one node should be self.
+     */
+    @Test
+    public void shouldElectTTLLeaderCorrectlyWithRealIPs() {
+        final Endpoint selfEndpoint = Endpoint.of(SELF_POD_IP, GRPC_PORT);
+        testEndpointGroup.setSelfEndpoint(selfEndpoint);
+
+        testEndpointGroup.fireEndpoints(List.of(
+            Endpoint.of(REMOTE_POD_IP, GRPC_PORT),
+            selfEndpoint
+        ));
+
+        final List<RemoteInstance> instances = coordinator.queryRemoteNodes();
+        Collections.sort(instances);
+
+        assertEquals(2, instances.size());
+
+        final long selfCount = instances.stream()
+            .filter(i -> i.getAddress().isSelf())
+            .count();
+        assertEquals(1, selfCount, "Exactly one instance should be self");
+
+        final RemoteInstance selfInstance = instances.stream()
+            .filter(i -> i.getAddress().isSelf())
+            .findFirst()
+            .orElseThrow();
+        assertEquals(SELF_POD_IP, selfInstance.getAddress().getHost());
+    }
+
+    private RemoteInstance findByHost(List<RemoteInstance> instances, String 
host) {
+        return instances.stream()
+            .filter(i -> i.getAddress().getHost().equals(host))
+            .findFirst()
+            .orElseThrow(() -> new AssertionError(
+                "No instance with host: " + host + ", found: " + instances));
+    }
+
+    /**
+     * A controllable endpoint group for testing. Implements {@link 
SelfEndpointAccessor}
+     * so the coordinator can resolve self endpoint without requiring a real 
K8s client.
+     *
+     * Simulates the K8s informer behavior:
+     * - Initially selfEndpoint is null (informer hasn't synced self pod yet)
+     * - After calling {@link #setSelfEndpoint}, the next {@link 
#fireEndpoints} call
+     *   simulates the informer adding the self pod to its cache
+     */
+    static class TestEndpointGroup extends DynamicEndpointGroup implements 
SelfEndpointAccessor {
+        private volatile Endpoint selfEndpoint;
+
+        @Override
+        public Endpoint getSelfEndpoint() {
+            return selfEndpoint;
+        }
+
+        void setSelfEndpoint(Endpoint selfEndpoint) {
+            this.selfEndpoint = selfEndpoint;
+        }
+
+        void fireEndpoints(List<Endpoint> endpoints) {
+            setEndpoints(endpoints);
+        }
+
+        @Override
+        protected void doCloseAsync(CompletableFuture<?> future) {
+            future.complete(null);
+        }
+    }
+}

Reply via email to