This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new ebda65ca12 Add weighted handler support to BatchQueue adaptive 
partitioning (#13801)
ebda65ca12 is described below

commit ebda65ca12b64ebfb2eb8e10a6348288f9b5c4c3
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Tue Apr 7 09:52:21 2026 +0800

    Add weighted handler support to BatchQueue adaptive partitioning (#13801)
    
    `PartitionPolicy.resolve()` now accepts a weighted handler count instead of 
raw handler count.
    `BatchQueue.addHandler(type, handler, weight)` overload allows callers to 
specify partition weight per handler type.
    
    **L1 (MetricsAggregateWorker):** MAL metrics use weight 0.05 (vs 1.0 for 
OAL). Rationale: MAL emits ~500 items/type per scrape interval. With 
20,000-slot buffers, ~40 MAL types can safely share one partition (20,000 / 500 
= 40). Weight 0.05 ≈ 1/20 gives 2x headroom.
    
    **L2 (MetricsPersistentMinWorker):** No weight differentiation. After L1 
pre-aggregation, both OAL and MAL have similar per-minute burst patterns.
    
    **Impact (8-core, 642 OAL + 1,247 MAL types):**
    
    | | L1 Before | L1 After | Reduction |
    |---|---|---|---|
    | Partitions | 1,045 | 452 | 57% |
    | Array overhead | 167 MB | 72 MB | 57% |
---
 docs/en/changes/changes.md                         |  1 +
 .../analysis/worker/MetricsAggregateWorker.java    | 12 +++++-
 .../worker/MetricsPersistentMinWorker.java         |  6 +++
 .../server-library/library-batch-queue/CLAUDE.md   | 18 +++++++++
 .../oap/server/library/batchqueue/BatchQueue.java  | 43 ++++++++++++++++++----
 .../server/library/batchqueue/PartitionPolicy.java | 33 +++++++++++------
 .../library/batchqueue/PartitionPolicyTest.java    | 23 ++++++++++++
 7 files changed, 115 insertions(+), 21 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 08992c1600..edc2abaca9 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,6 +12,7 @@
 * Fix missing `and` keyword in `JDBCEBPFProfilingTaskDAO.getTaskRecord()` SQL 
query, which caused a syntax error on every invocation.
 * Fix duplicate `TABLE_COLUMN` condition in 
`JDBCMetadataQueryDAO.findEndpoint()`, which was binding the same parameter 
twice due to a copy-paste error.
 * Support MCP (Model Context Protocol) observability for Envoy AI Gateway: MCP 
metrics (request CPM/latency, method breakdown, backend breakdown, 
initialization latency, capabilities), MCP access log sampling (errors only), 
`ai_route_type` searchable log tag, and MCP dashboard tabs.
+* Add weighted handler support to `BatchQueue` adaptive partitioning. MAL 
metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and 
memory overhead when many MAL metric types are registered.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 0361e0711f..4af51a9cd1 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
@@ -121,7 +122,16 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             QUEUE_USAGE_GAUGE = gauge;
         }
 
-        l1Queue.addHandler(metricsClass, new L1Handler());
+        // OAL metrics receive items on every incoming request (high, 
continuous throughput),
+        // so each type benefits from a dedicated partition — weight 1.0.
+        // MAL metrics receive items only once per scrape interval (typically 
1 emit/min),
+        // producing at most ~500 items per type per burst. With a 20,000-slot 
buffer,
+        // ~40 MAL types can safely share one partition (20,000 / 500 = 40). 
We use
+        // weight 0.05 (≈ 1/20) to give 2x headroom over the theoretical 
sharing limit.
+        // This significantly reduces partition count and memory overhead when 
many MAL
+        // metric types are registered (e.g., from otel-rules).
+        final double weight = Meter.class.isAssignableFrom(metricsClass) ? 
0.05 : 1.0;
+        l1Queue.addHandler(metricsClass, new L1Handler(), weight);
     }
 
     @Override
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
index 197cdab9fa..1e0725adb7 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
@@ -118,6 +118,12 @@ public class MetricsPersistentMinWorker extends 
MetricsPersistentWorker {
             QUEUE_USAGE_GAUGE = gauge;
         }
 
+        // No weight differentiation at L2. After L1 pre-aggregation, both OAL 
and MAL
+        // produce one item per (metric_type × entity) per minute — similar 
burst patterns
+        // and throughput. The OAL per-request amplification is absorbed by 
L1, so at L2
+        // there is no meaningful throughput difference to justify partition 
weight tuning.
+        // L2 buffer is also much smaller (2,000 vs 20,000), so the memory 
overhead of
+        // extra partitions is modest (~16 MB total vs L1's ~167 MB).
         l2Queue.addHandler(metricsClass, new L2Handler());
     }
 
diff --git a/oap-server/server-library/library-batch-queue/CLAUDE.md 
b/oap-server/server-library/library-batch-queue/CLAUDE.md
index 09e7278752..ad88ca6ba3 100644
--- a/oap-server/server-library/library-batch-queue/CLAUDE.md
+++ b/oap-server/server-library/library-batch-queue/CLAUDE.md
@@ -80,6 +80,24 @@ Adaptive growth (default multiplier 25, with 8 threads -> 
threshold 200):
 - 100 handlers -> 100 partitions (1:1)
 - 500 handlers -> 350 partitions (200 + 300/2)
 
+### Weighted handlers
+
+`addHandler(type, handler, weight)` allows different handler types to 
contribute different
+amounts to the partition count. The adaptive formula uses the weighted sum 
instead of raw
+handler count. Partition assignment remains hash-based (`typeHash()`) — weight 
only affects
+how many partitions exist, not which partition a type lands on.
+
+L1 uses weight 0.05 for MAL metrics (vs 1.0 for OAL). Rationale: MAL emits 
~500 items/type
+per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share 
one partition
+(20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom.
+
+Example (8 threads, 642 OAL + 1,247 MAL):
+- Without weight: 1,889 handlers -> 1,045 partitions (167 MB array overhead at 
L1)
+- With weight: 642*1.0 + 1,247*0.05 = 705 effective -> 452 partitions (72 MB)
+
+L2 uses default weight 1.0 for all types because after L1 pre-aggregation both 
OAL and MAL
+have similar per-minute burst patterns.
+
 ## Drain Rebalancing
 
 Static round-robin partition assignment creates thread imbalance when metric 
types have varying
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
index 372d9fc2ed..597ca4d6ba 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
@@ -128,6 +128,13 @@ public class BatchQueue<T> {
      */
     private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap;
 
+    /**
+     * Running weighted sum of registered handlers, used by adaptive partition 
policy.
+     * Each handler contributes its weight (default 1.0) when registered via
+     * {@link #addHandler(Class, HandlerConsumer, double)}.
+     */
+    private double weightedHandlerCount;
+
     /**
      * Tracks unregistered types that have already been warned about,
      * to avoid flooding the log with repeated errors.
@@ -382,23 +389,43 @@ public class BatchQueue<T> {
     }
 
     /**
-     * Register a type-based handler. Items whose {@code getClass()} matches 
the given
-     * type will be batched together and dispatched to this handler.
+     * Register a type-based handler with default weight 1.0.
+     *
+     * @param type the class of items to route to this handler
+     * @param handler the consumer that processes batches of the given type
+     * @see #addHandler(Class, HandlerConsumer, double)
+     */
+    public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler) {
+        addHandler(type, handler, 1.0);
+    }
+
+    /**
+     * Register a type-based handler with an explicit weight for adaptive 
partition sizing.
      *
-     * <p>For adaptive partition policies, adding a handler recalculates the 
partition
-     * count and grows the partition array if needed. For non-adaptive 
policies the
-     * resolved count never changes, so this is a no-op beyond the 
registration.
-     * Drain loop threads pick up new partitions on their next cycle via 
volatile reads.
+     * <p>The weight controls how much this handler contributes to partition 
growth.
+     * A weight of 1.0 means one handler ≈ one partition (below the adaptive 
threshold).
+     * A lower weight (e.g., 0.05) means many handlers share a partition, 
suitable for
+     * low-throughput types. The weighted sum of all handlers replaces the raw 
handler
+     * count in the adaptive partition formula.
+     *
+     * <p>For non-adaptive partition policies the weight is ignored and this 
behaves
+     * the same as {@link #addHandler(Class, HandlerConsumer)}.
      *
      * @param type the class of items to route to this handler
      * @param handler the consumer that processes batches of the given type
+     * @param weight partition weight for this handler (default 1.0). Must be 
&gt; 0.
      */
     @SuppressWarnings("unchecked")
-    public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler) {
+    public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler,
+                           final double weight) {
+        if (weight <= 0) {
+            throw new IllegalArgumentException("Handler weight must be > 0, 
got: " + weight);
+        }
         handlerMap.put(type, handler);
+        weightedHandlerCount += weight;
 
         final int newPartitionCount = config.getPartitions()
-            .resolve(resolvedThreadCount, handlerMap.size());
+            .resolve(resolvedThreadCount, weightedHandlerCount);
         final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
         if (newPartitionCount > currentPartitions.length) {
             final int oldCount = currentPartitions.length;
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
index 8ec0a0c4cf..8f916c2c2b 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
@@ -31,8 +31,8 @@ package org.apache.skywalking.oap.server.library.batchqueue;
  *       excess handlers share partitions at 1:2 ratio.</li>
  * </ul>
  *
- * <p>All policies are resolved via {@link #resolve(int, int)}. For 
non-adaptive
- * policies the handlerCount parameter is ignored. At queue creation time, if 
the
+ * <p>All policies are resolved via {@link #resolve(int, double)}. For 
non-adaptive
+ * policies the weightedHandlerCount parameter is ignored. At queue creation 
time, if the
  * resolved partition count is less than the thread count, the thread count is
  * reduced to match and a warning is logged.
  */
@@ -130,29 +130,38 @@ public class PartitionPolicy {
      * <ul>
      *   <li>fixed: returns the pre-set count (both parameters ignored).</li>
      *   <li>threadMultiply: returns multiplier * resolvedThreadCount 
(handlerCount ignored).</li>
-     *   <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as 
a sensible
-     *       initial count. Otherwise, threshold = threadCount * multiplier; 
if handlerCount
-     *       &lt;= threshold, returns handlerCount (1:1). If above, returns
-     *       threshold + (handlerCount - threshold) / 2.</li>
+     *   <li>adaptive: when weightedHandlerCount is 0, returns 
resolvedThreadCount as a sensible
+     *       initial count. Otherwise, threshold = threadCount * multiplier; 
if weightedHandlerCount
+     *       &lt;= threshold, returns weightedHandlerCount (1:1). If above, 
returns
+     *       threshold + (weightedHandlerCount - threshold) / 2.</li>
      * </ul>
      *
      * @param resolvedThreadCount the resolved number of drain threads
-     * @param handlerCount the current number of registered type handlers
+     * @param weightedHandlerCount the weighted sum of registered type 
handlers. Each handler
+     *                             contributes its weight (default 1.0) to 
this sum.
+     *                             High-weight handlers grow the partition 
count faster,
+     *                             reducing the chance of hash collisions for 
those types.
+     *                             Low-weight handlers grow the count slowly, 
so they are
+     *                             more likely to share partitions with other 
types via
+     *                             {@code typeHash()} routing. Note that 
partition assignment
+     *                             is hash-based, not weight-based — there is 
no guarantee
+     *                             that any type gets a dedicated partition.
      * @return the resolved partition count, always &gt;= 1
      */
-    public int resolve(final int resolvedThreadCount, final int handlerCount) {
+    public int resolve(final int resolvedThreadCount, final double 
weightedHandlerCount) {
         if (fixedCount > 0) {
             return fixedCount;
         }
         if (adaptive) {
-            if (handlerCount == 0) {
+            final int effectiveCount = (int) Math.ceil(weightedHandlerCount);
+            if (effectiveCount == 0) {
                 return Math.max(1, resolvedThreadCount);
             }
             final int threshold = Math.max(1, multiplier * 
resolvedThreadCount);
-            if (handlerCount <= threshold) {
-                return handlerCount;
+            if (effectiveCount <= threshold) {
+                return effectiveCount;
             }
-            return threshold + (handlerCount - threshold) / 2;
+            return threshold + (effectiveCount - threshold) / 2;
         }
         return Math.max(1, multiplier * resolvedThreadCount);
     }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
index 9cdb5d883b..0dab65854b 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
@@ -109,6 +109,29 @@ public class PartitionPolicyTest {
             () -> PartitionPolicy.adaptive(0));
     }
 
+    @Test
+    public void testAdaptiveWithWeightedHandlers() {
+        // Simulate 642 OAL (weight 1.0) + 1247 MAL (weight 0.05)
+        // Weighted count = 642 + 1247 * 0.05 = 642 + 62.35 = 704.35, ceil = 
705
+        // 8 threads * 25 = 200 threshold, 705 > 200
+        // Result = 200 + (705 - 200) / 2 = 200 + 252 = 452
+        final double weightedCount = 642 * 1.0 + 1247 * 0.05;
+        assertEquals(452, PartitionPolicy.adaptive().resolve(8, 
weightedCount));
+    }
+
+    @Test
+    public void testAdaptiveWithLowWeightOnly() {
+        // 100 MAL-only handlers at weight 0.05 = effective 5, ceil = 5
+        // 8 threads, threshold = 200, 5 < 200 -> 1:1 -> 5 partitions
+        assertEquals(5, PartitionPolicy.adaptive().resolve(8, 100 * 0.05));
+    }
+
+    @Test
+    public void testAdaptiveWithZeroWeightedCount() {
+        // weightedCount = 0.0 should return threadCount
+        assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0.0));
+    }
+
     @Test
     public void testToString() {
         assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());

Reply via email to