Wubabalala commented on PR #16048:
URL: https://github.com/apache/dubbo/pull/16048#issuecomment-4087574069

   Thanks for working on this fix. I spent some time tracing through the full 
call chain on `main` to verify the bug and evaluate this PR. Sharing my 
findings below — the bug is real and more impactful than it might appear.
   
   ## Bug Verification — Code-Level Trace
   
   I traced the complete request lifecycle to confirm the root cause:
   
   **Step 1.** `AdaptiveLoadBalanceFilter.onResponse()` computes client-side RT 
and calls `setProviderMetrics()` asynchronously:
   
   ```java
   // AdaptiveLoadBalanceFilter.java L117-L121
   metricsMap.put("rt", String.valueOf(System.currentTimeMillis() - startTime));
   getExecutor().execute(() -> {
       adaptiveMetrics.setProviderMetrics(getServiceKey(invocation), 
metricsMap);
   });
   ```
   
   **Step 2.** `setProviderMetrics()` sets both timestamps to the same value:
   
   ```java
   // AdaptiveMetrics.java L93-L94
   metrics.currentProviderTime = serviceTime;
   metrics.currentTime = serviceTime;          // ← same value as above
   ```
   
   **Step 3.** On the next request, `getLoad()` evaluates the penalty condition:
   
   ```java
   // AdaptiveMetrics.java L58-L60
   if (metrics.currentProviderTime == metrics.currentTime) {  // ← always true 
after Step 2
       metrics.lastLatency = timeout * 2L;                     // ← real RT 
overwritten
   }
   ```
   
   Since Step 2 guarantees `currentProviderTime == currentTime`, the penalty 
branch fires on **every normal response cycle**. The real RT (set at L98 in 
`setProviderMetrics`) is immediately overwritten by `timeout * 2L`.
   
   **Step 4.** `chooseLowLoadInvoker()` uses the corrupted value directly:
   
   ```java
   // AdaptiveLoadBalance.java L93-L96
   long load1 = Double.doubleToLongBits(
           adaptiveMetrics.getLoad(getServiceKey(invoker1, invocation), 
weight1, timeout1));
   ```
   
   No clamping, no normalization, no outlier detection. The polluted EWMA feeds 
straight into the routing decision.
   
   ## Simulation Results
   
   I ported `AdaptiveMetrics.java` faithfully to Python and ran three 
scenarios. Scripts are attached at the bottom.
   
   ### Scenario 1: Normal Traffic — Penalty Overwrites Real RT
   
   3 servers (A=10ms, B=50ms, C=200ms), timeout=100ms:
   
   ```
   Round  Server       Real RT    lastLatency    EWMA       Branch
   1      A (10ms)     10         200            102.5      PENALTY
   1      B (50ms)     50         200            112.5      PENALTY
   1      C (200ms)    200        200            150.0      PENALTY
   ...
   10     A (10ms)     10         200            136.7      PENALTY
   10     B (50ms)     50         200            150.0      PENALTY
   10     C (200ms)    200        200            200.0      PENALTY
   ```
   
   Every round hits PENALTY. A 10ms server's EWMA (136.7) is only 1.5x less 
than a 200ms server (200.0) — should be 20x. Adaptive load balancing degrades 
to near-random distribution.
   
   ### Scenario 2: Node Degradation — Invisible to Algorithm
   
   Server A degrades from 10ms → 500ms at round 6 (GC pause, downstream 
timeout):
   
   ```
   Round  Event              Server   Real RT    lastLat    EWMA       Branch
   5                         A        10         200        136.5      PENALTY
   6      A DEGRADES→500ms   A        500        200        259.1      PENALTY
   ...
   12                        A        500        200        300.0      PENALTY
   12                        B        50         200        150.0      PENALTY
   ```
   
   A's `lastLatency` remains 200 (penalty value), not 500. The algorithm is 
blind to the degradation. Traffic continues flowing to the degraded node — 
cascading failure risk with no circuit breaker to catch it (`onError()` at 
`AdaptiveLoadBalanceFilter.java L131` only increments a counter).
   
   ### Scenario 3: Low QPS — Decay to Zero
   
   Server C (200ms) idle for 2 seconds, then `getLoad()` is called:
   
   ```
   multiple = 2000ms / 100ms + 1 = 21
   lastLatency = 200 >> 21 = 0
   ```
   
   The slowest server's latency decays to zero — it now appears "fastest". 
Traffic floods the worst-performing node.
   
   ## Safety Net Audit
   
   I checked whether any compensating mechanism mitigates these bugs:
   
   | Layer | Present? | Effect |
   |-------|----------|--------|
   | P2C random selection (`AdaptiveLoadBalance.java L68-L78`) | Yes | 
Probabilistic buffer only — polluted node isn't picked every time, but still 
gets ~equal share |
   | pickTime forced selection (`AdaptiveMetrics.java L51-L53`) | Yes | 
Prevents starvation, but never triggers when all nodes have similar (polluted) 
EWMA |
   | Load value clamping / normalization | No | — |
   | Circuit breaker / outlier detection | No | `onError()` only increments 
counter |
   | Fallback to other LB strategy | No | LB strategies are mutually exclusive |
   | EWMA range validation | No | — |
   
   For comparison, `ShortestResponseLoadBalance` in the same codebase uses a 
sliding window over `RpcStatus` (real success counts and elapsed time) — a more 
robust design that doesn't suffer from this class of bug.
   
   ## Comments on This PR's Approach
   
   ### 1. `==` → `>` effectively disables the penalty path
   
   After `getLoad()` executes, it sets `currentTime = 
System.currentTimeMillis()` (L65). For `currentProviderTime > currentTime` to 
hold, the provider's reported time would need to be in the future — essentially 
impossible under normal clock behavior.
   
   This swings from "penalty always fires" to "penalty never fires". A more 
targeted fix: track whether new provider metrics have arrived since the last 
`getLoad()` call, e.g., a `boolean providerUpdated` flag that 
`setProviderMetrics()` sets to `true` and `getLoad()` resets to `false`.
   
   ### 2. `Math.max(1L, ...)` — right idea, arbitrary floor
   
   Preventing decay-to-zero is correct. But `1L` is meaningless for a service 
with `timeout=5000ms` — a 1ms floor is effectively zero relative to the scale. 
Consider deriving the floor from timeout (e.g., `timeout / 100`) or making it 
configurable.
   
   ### 3. Test robustness
   
   - Reflection (`setAccessible`) to manipulate private fields couples the test 
to internal field names. Driving the test through the public API 
(`setProviderMetrics` + `getLoad`) with controlled inputs would be more 
resilient.
   - `Thread.sleep(150)` is a classic source of CI flakiness. A `Clock` 
abstraction or time-independent assertions would be more reliable.
   
   ## Verification Scripts
   
   <details>
   <summary>dubbo_adaptive_bug_sim.py — Basic penalty reproduction (click to 
expand)</summary>
   
   ```python
   """
   Dubbo AdaptiveMetrics Bug Simulation — Issue #15810
   Reproduces: penalty branch fires on every normal response,
   corrupting real latency data.
   """
   import time
   
   class AdaptiveMetrics:
       """Faithful port of Dubbo's AdaptiveMetrics.java (main branch)"""
       def __init__(self):
           self.current_provider_time = 0
           self.provider_cpu_load = 0.0
           self.last_latency = 0
           self.current_time = 0
           self.pick_time = int(time.time() * 1000)
           self.beta = 0.5
           self.ewma = 0.0
           self.consumer_req = 0
           self.consumer_success = 0
   
       def _now(self):
           return int(time.time() * 1000)
   
       def get_load(self, weight, timeout):
           if self._now() - self.pick_time > timeout * 2:
               return 0, "FORCED"
           branch = "NO_DATA"
           if self.current_time > 0:
               multiple = (self._now() - self.current_time) // timeout + 1
               if multiple > 0:
                   if self.current_provider_time == self.current_time:
                       self.last_latency = timeout * 2
                       branch = "PENALTY"
                   else:
                       self.last_latency = self.last_latency >> multiple
                       branch = "DECAY"
                   self.ewma = self.beta * self.ewma + (1 - self.beta) * 
self.last_latency
                   self.current_time = self._now()
           inflight = max(0, self.consumer_req - self.consumer_success)
           load = (self.provider_cpu_load * (self.ewma ** 0.5 + 1)
                   * (inflight + 1)
                   / (((self.consumer_success / (self.consumer_req + 1)) * 
weight) + 1))
           return load, branch
   
       def set_provider_metrics(self, service_time, rt, cpu_load):
           if self.current_provider_time > service_time:
               return
           self.current_provider_time = service_time
           self.current_time = service_time
           self.provider_cpu_load = cpu_load
           self.last_latency = rt
           self.ewma = self.beta * self.ewma + (1 - self.beta) * 
self.last_latency
   
       def add_req(self): self.consumer_req += 1
       def add_success(self): self.consumer_success += 1
   
   timeout = 100
   servers = {
       "A (10ms)":  {"m": AdaptiveMetrics(), "rt": 10},
       "B (50ms)":  {"m": AdaptiveMetrics(), "rt": 50},
       "C (200ms)": {"m": AdaptiveMetrics(), "rt": 200},
   }
   for r in range(1, 11):
       for name, s in servers.items():
           m = s["m"]
           now = int(time.time() * 1000)
           m.set_provider_metrics(now, s["rt"], 0.5)
           m.add_req(); m.add_success()
           time.sleep(0.001)
           load, branch = m.get_load(100, timeout)
           print(f"Round {r}: {name} real_rt={s['rt']} lastLat={m.last_latency} 
ewma={m.ewma:.1f} [{branch}]")
   ```
   
   </details>
   
   <details>
   <summary>dubbo_adaptive_distributed_sim.py — Node degradation + Low QPS 
decay (click to expand)</summary>
   
   ```python
   """
   Dubbo AdaptiveMetrics Bug — Distributed Scenarios
   Scenario 1: Node degradation (10ms -> 500ms), algorithm blind to change
   Scenario 2: Low QPS decay-to-zero, slowest server looks "fastest"
   """
   import time
   
   class AdaptiveMetrics:
       def __init__(self):
           self.current_provider_time = 0
           self.provider_cpu_load = 0.0
           self.last_latency = 0
           self.current_time = 0
           self.pick_time = int(time.time() * 1000)
           self.beta = 0.5
           self.ewma = 0.0
           self.consumer_req = 0
           self.consumer_success = 0
   
       def _now(self):
           return int(time.time() * 1000)
   
       def get_load(self, weight, timeout):
           if self._now() - self.pick_time > timeout * 2:
               return 0, "FORCED"
           branch = "NO_DATA"
           if self.current_time > 0:
               multiple = (self._now() - self.current_time) // timeout + 1
               if multiple > 0:
                   if self.current_provider_time == self.current_time:
                       self.last_latency = timeout * 2
                       branch = "PENALTY"
                   else:
                       self.last_latency = self.last_latency >> multiple
                       branch = f"DECAY(>>{multiple})"
                   self.ewma = self.beta * self.ewma + (1 - self.beta) * 
self.last_latency
                   self.current_time = self._now()
           return 0, branch
   
       def set_provider_metrics(self, service_time, rt, cpu_load):
           if self.current_provider_time > service_time:
               return
           self.current_provider_time = service_time
           self.current_time = service_time
           self.provider_cpu_load = cpu_load
           self.last_latency = rt
           self.ewma = self.beta * self.ewma + (1 - self.beta) * 
self.last_latency
   
       def add_req(self): self.consumer_req += 1
       def add_success(self): self.consumer_success += 1
   
   # Scenario 1: Node Degradation
   print("=== Scenario 1: Node Degradation ===")
   timeout = 100
   servers = {"A": {"m": AdaptiveMetrics(), "rt": 10},
              "B": {"m": AdaptiveMetrics(), "rt": 50},
              "C": {"m": AdaptiveMetrics(), "rt": 80}}
   for r in range(1, 13):
       if r == 6: servers["A"]["rt"] = 500
       for name in ["A", "B", "C"]:
           s = servers[name]; m = s["m"]
           now = int(time.time() * 1000)
           m.set_provider_metrics(now, s["rt"], 0.5)
           m.add_req(); m.add_success()
           time.sleep(0.001)
           _, branch = m.get_load(100, timeout)
           marker = " <- DEGRADED" if r >= 6 and name == "A" else ""
           print(f"R{r} {name}: rt={s['rt']}ms lastLat={m.last_latency} 
ewma={m.ewma:.1f} [{branch}]{marker}")
   
   # Scenario 2: Low QPS Decay
   print("\n=== Scenario 2: Low QPS Decay to Zero ===")
   m = AdaptiveMetrics()
   now = int(time.time() * 1000)
   m.set_provider_metrics(now, 200, 0.5)
   m.add_req(); m.add_success()
   time.sleep(0.001)
   m.get_load(100, timeout)
   print(f"Before idle: lastLat={m.last_latency}, ewma={m.ewma:.1f}")
   m.current_time = int(time.time() * 1000) - 2000
   m.current_provider_time = m.current_time - 1
   m.get_load(100, timeout)
   print(f"After 2s idle: lastLat={m.last_latency}, ewma={m.ewma:.1f}")
   print(f"200 >> 21 = {200 >> 21} -- slowest server now looks 'fastest'")
   ```
   
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to