This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch feature/batch-queue-weighted-partitions
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 3c3757ed970cd5caa13ed41824c9330a7094a561
Author: Wu Sheng <[email protected]>
AuthorDate: Tue Apr 7 08:54:21 2026 +0800

    Add weighted handler support to BatchQueue adaptive partitioning
    
    MAL metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition
    count and memory overhead when many MAL metric types are registered.
---
 docs/en/changes/changes.md                         |  1 +
 .../analysis/worker/MetricsAggregateWorker.java    | 12 +++++-
 .../worker/MetricsPersistentMinWorker.java         |  6 +++
 .../server-library/library-batch-queue/CLAUDE.md   | 18 +++++++++
 .../oap/server/library/batchqueue/BatchQueue.java  | 43 ++++++++++++++++++----
 .../server/library/batchqueue/PartitionPolicy.java | 33 +++++++++++------
 .../library/batchqueue/PartitionPolicyTest.java    | 23 ++++++++++++
 7 files changed, 115 insertions(+), 21 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 08992c1600..edc2abaca9 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,6 +12,7 @@
 * Fix missing `and` keyword in `JDBCEBPFProfilingTaskDAO.getTaskRecord()` SQL 
query, which caused a syntax error on every invocation.
 * Fix duplicate `TABLE_COLUMN` condition in 
`JDBCMetadataQueryDAO.findEndpoint()`, which was binding the same parameter 
twice due to a copy-paste error.
 * Support MCP (Model Context Protocol) observability for Envoy AI Gateway: MCP 
metrics (request CPM/latency, method breakdown, backend breakdown, 
initialization latency, capabilities), MCP access log sampling (errors only), 
`ai_route_type` searchable log tag, and MCP dashboard tabs.
+* Add weighted handler support to `BatchQueue` adaptive partitioning. MAL 
metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and 
memory overhead when many MAL metric types are registered.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 0361e0711f..4af51a9cd1 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
@@ -121,7 +122,16 @@ public class MetricsAggregateWorker extends 
AbstractWorker<Metrics> {
             QUEUE_USAGE_GAUGE = gauge;
         }
 
-        l1Queue.addHandler(metricsClass, new L1Handler());
+        // OAL metrics receive items on every incoming request (high, 
continuous throughput),
+        // so each type benefits from a dedicated partition — weight 1.0.
+        // MAL metrics receive items only once per scrape interval (typically 
1 emit/min),
+        // producing at most ~500 items per type per burst. With a 20,000-slot 
buffer,
+        // ~40 MAL types can safely share one partition (20,000 / 500 = 40). 
We use
+        // weight 0.05 (≈ 1/20) to give 2x headroom over the theoretical 
sharing limit.
+        // This significantly reduces partition count and memory overhead when 
many MAL
+        // metric types are registered (e.g., from otel-rules).
+        final double weight = Meter.class.isAssignableFrom(metricsClass) ? 
0.05 : 1.0;
+        l1Queue.addHandler(metricsClass, new L1Handler(), weight);
     }
 
     @Override
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
index 197cdab9fa..1e0725adb7 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
@@ -118,6 +118,12 @@ public class MetricsPersistentMinWorker extends 
MetricsPersistentWorker {
             QUEUE_USAGE_GAUGE = gauge;
         }
 
+        // No weight differentiation at L2. After L1 pre-aggregation, both OAL 
and MAL
+        // produce one item per (metric_type × entity) per minute — similar 
burst patterns
+        // and throughput. The OAL per-request amplification is absorbed by 
L1, so at L2
+        // there is no meaningful throughput difference to justify partition 
weight tuning.
+        // L2 buffer is also much smaller (2,000 vs 20,000), so the memory 
overhead of
+        // extra partitions is modest (~16 MB total vs L1's ~167 MB).
         l2Queue.addHandler(metricsClass, new L2Handler());
     }
 
diff --git a/oap-server/server-library/library-batch-queue/CLAUDE.md 
b/oap-server/server-library/library-batch-queue/CLAUDE.md
index 09e7278752..ad88ca6ba3 100644
--- a/oap-server/server-library/library-batch-queue/CLAUDE.md
+++ b/oap-server/server-library/library-batch-queue/CLAUDE.md
@@ -80,6 +80,24 @@ Adaptive growth (default multiplier 25, with 8 threads -> 
threshold 200):
 - 100 handlers -> 100 partitions (1:1)
 - 500 handlers -> 350 partitions (200 + 300/2)
 
+### Weighted handlers
+
+`addHandler(type, handler, weight)` allows different handler types to 
contribute different
+amounts to the partition count. The adaptive formula uses the weighted sum 
instead of raw
+handler count. Partition assignment remains hash-based (`typeHash()`) — weight 
only affects
+how many partitions exist, not which partition a type lands on.
+
+L1 uses weight 0.05 for MAL metrics (vs 1.0 for OAL). Rationale: MAL emits 
~500 items/type
+per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share 
one partition
+(20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom.
+
+Example (8 threads, 642 OAL + 1,247 MAL):
+- Without weight: 1,889 handlers -> 1,045 partitions (167 MB array overhead at 
L1)
+- With weight: 642*1.0 + 1,247*0.05 = 705 effective -> 452 partitions (72 MB)
+
+L2 uses default weight 1.0 for all types because after L1 pre-aggregation both 
OAL and MAL
+have similar per-minute burst patterns.
+
 ## Drain Rebalancing
 
 Static round-robin partition assignment creates thread imbalance when metric 
types have varying
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
index 372d9fc2ed..597ca4d6ba 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
@@ -128,6 +128,13 @@ public class BatchQueue<T> {
      */
     private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap;
 
+    /**
+     * Running weighted sum of registered handlers, used by adaptive partition 
policy.
+     * Each handler contributes its weight (default 1.0) when registered via
+     * {@link #addHandler(Class, HandlerConsumer, double)}.
+     */
+    private double weightedHandlerCount;
+
     /**
      * Tracks unregistered types that have already been warned about,
      * to avoid flooding the log with repeated errors.
@@ -382,23 +389,43 @@ public class BatchQueue<T> {
     }
 
     /**
-     * Register a type-based handler. Items whose {@code getClass()} matches 
the given
-     * type will be batched together and dispatched to this handler.
+     * Register a type-based handler with default weight 1.0.
+     *
+     * @param type the class of items to route to this handler
+     * @param handler the consumer that processes batches of the given type
+     * @see #addHandler(Class, HandlerConsumer, double)
+     */
+    public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler) {
+        addHandler(type, handler, 1.0);
+    }
+
+    /**
+     * Register a type-based handler with an explicit weight for adaptive 
partition sizing.
      *
-     * <p>For adaptive partition policies, adding a handler recalculates the 
partition
-     * count and grows the partition array if needed. For non-adaptive 
policies the
-     * resolved count never changes, so this is a no-op beyond the 
registration.
-     * Drain loop threads pick up new partitions on their next cycle via 
volatile reads.
+     * <p>The weight controls how much this handler contributes to partition 
growth.
+     * A weight of 1.0 means one handler ≈ one partition (below the adaptive 
threshold).
+     * A lower weight (e.g., 0.05) means many handlers share a partition, 
suitable for
+     * low-throughput types. The weighted sum of all handlers replaces the raw 
handler
+     * count in the adaptive partition formula.
+     *
+     * <p>For non-adaptive partition policies the weight is ignored and this 
behaves
+     * the same as {@link #addHandler(Class, HandlerConsumer)}.
      *
      * @param type the class of items to route to this handler
      * @param handler the consumer that processes batches of the given type
+     * @param weight partition weight for this handler (default 1.0). Must be 
&gt; 0.
      */
     @SuppressWarnings("unchecked")
-    public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler) {
+    public void addHandler(final Class<? extends T> type, final 
HandlerConsumer<T> handler,
+                           final double weight) {
+        if (weight <= 0) {
+            throw new IllegalArgumentException("Handler weight must be > 0, 
got: " + weight);
+        }
         handlerMap.put(type, handler);
+        weightedHandlerCount += weight;
 
         final int newPartitionCount = config.getPartitions()
-            .resolve(resolvedThreadCount, handlerMap.size());
+            .resolve(resolvedThreadCount, weightedHandlerCount);
         final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
         if (newPartitionCount > currentPartitions.length) {
             final int oldCount = currentPartitions.length;
diff --git 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
index 8ec0a0c4cf..8f916c2c2b 100644
--- 
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
+++ 
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
@@ -31,8 +31,8 @@ package org.apache.skywalking.oap.server.library.batchqueue;
  *       excess handlers share partitions at 1:2 ratio.</li>
  * </ul>
  *
- * <p>All policies are resolved via {@link #resolve(int, int)}. For 
non-adaptive
- * policies the handlerCount parameter is ignored. At queue creation time, if 
the
+ * <p>All policies are resolved via {@link #resolve(int, double)}. For 
non-adaptive
+ * policies the weightedHandlerCount parameter is ignored. At queue creation 
time, if the
  * resolved partition count is less than the thread count, the thread count is
  * reduced to match and a warning is logged.
  */
@@ -130,29 +130,38 @@ public class PartitionPolicy {
      * <ul>
      *   <li>fixed: returns the pre-set count (both parameters ignored).</li>
      *   <li>threadMultiply: returns multiplier * resolvedThreadCount 
(handlerCount ignored).</li>
-     *   <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as 
a sensible
-     *       initial count. Otherwise, threshold = threadCount * multiplier; 
if handlerCount
-     *       &lt;= threshold, returns handlerCount (1:1). If above, returns
-     *       threshold + (handlerCount - threshold) / 2.</li>
+     *   <li>adaptive: when weightedHandlerCount is 0, returns 
resolvedThreadCount as a sensible
+     *       initial count. Otherwise, threshold = threadCount * multiplier; 
if weightedHandlerCount
+     *       &lt;= threshold, returns weightedHandlerCount (1:1). If above, 
returns
+     *       threshold + (weightedHandlerCount - threshold) / 2.</li>
      * </ul>
      *
      * @param resolvedThreadCount the resolved number of drain threads
-     * @param handlerCount the current number of registered type handlers
+     * @param weightedHandlerCount the weighted sum of registered type 
handlers. Each handler
+     *                             contributes its weight (default 1.0) to 
this sum.
+     *                             High-weight handlers grow the partition 
count faster,
+     *                             reducing the chance of hash collisions for 
those types.
+     *                             Low-weight handlers grow the count slowly, 
so they are
+     *                             more likely to share partitions with other 
types via
+     *                             {@code typeHash()} routing. Note that 
partition assignment
+     *                             is hash-based, not weight-based — there is 
no guarantee
+     *                             that any type gets a dedicated partition.
      * @return the resolved partition count, always &gt;= 1
      */
-    public int resolve(final int resolvedThreadCount, final int handlerCount) {
+    public int resolve(final int resolvedThreadCount, final double 
weightedHandlerCount) {
         if (fixedCount > 0) {
             return fixedCount;
         }
         if (adaptive) {
-            if (handlerCount == 0) {
+            final int effectiveCount = (int) Math.ceil(weightedHandlerCount);
+            if (effectiveCount == 0) {
                 return Math.max(1, resolvedThreadCount);
             }
             final int threshold = Math.max(1, multiplier * 
resolvedThreadCount);
-            if (handlerCount <= threshold) {
-                return handlerCount;
+            if (effectiveCount <= threshold) {
+                return effectiveCount;
             }
-            return threshold + (handlerCount - threshold) / 2;
+            return threshold + (effectiveCount - threshold) / 2;
         }
         return Math.max(1, multiplier * resolvedThreadCount);
     }
diff --git 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
index 9cdb5d883b..0dab65854b 100644
--- 
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
+++ 
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
@@ -109,6 +109,29 @@ public class PartitionPolicyTest {
             () -> PartitionPolicy.adaptive(0));
     }
 
+    @Test
+    public void testAdaptiveWithWeightedHandlers() {
+        // Simulate 642 OAL (weight 1.0) + 1247 MAL (weight 0.05)
+        // Weighted count = 642 + 1247 * 0.05 = 642 + 62.35 = 704.35, ceil = 
705
+        // 8 threads * 25 = 200 threshold, 705 > 200
+        // Result = 200 + (705 - 200) / 2 = 200 + 252 = 452
+        final double weightedCount = 642 * 1.0 + 1247 * 0.05;
+        assertEquals(452, PartitionPolicy.adaptive().resolve(8, 
weightedCount));
+    }
+
+    @Test
+    public void testAdaptiveWithLowWeightOnly() {
+        // 100 MAL-only handlers at weight 0.05 = effective 5, ceil = 5
+        // 8 threads, threshold = 200, 5 < 200 -> 1:1 -> 5 partitions
+        assertEquals(5, PartitionPolicy.adaptive().resolve(8, 100 * 0.05));
+    }
+
+    @Test
+    public void testAdaptiveWithZeroWeightedCount() {
+        // weightedCount = 0.0 should return threadCount
+        assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0.0));
+    }
+
     @Test
     public void testToString() {
         assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());

Reply via email to