This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new ebda65ca12 Add weighted handler support to BatchQueue adaptive
partitioning (#13801)
ebda65ca12 is described below
commit ebda65ca12b64ebfb2eb8e10a6348288f9b5c4c3
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Tue Apr 7 09:52:21 2026 +0800
Add weighted handler support to BatchQueue adaptive partitioning (#13801)
`PartitionPolicy.resolve()` now accepts a weighted handler count instead of
raw handler count.
`BatchQueue.addHandler(type, handler, weight)` overload allows callers to
specify partition weight per handler type.
**L1 (MetricsAggregateWorker):** MAL metrics use weight 0.05 (vs 1.0 for
OAL). Rationale: MAL emits ~500 items/type per scrape interval. With
20,000-slot buffers, ~40 MAL types can safely share one partition (20,000 / 500
= 40). Weight 0.05 ≈ 1/20 gives 2x headroom.
**L2 (MetricsPersistentMinWorker):** No weight differentiation. After L1
pre-aggregation, both OAL and MAL have similar per-minute burst patterns.
**Impact (8-core, 642 OAL + 1,247 MAL types):**
| | L1 Before | L1 After | Reduction |
|---|---|---|---|
| Partitions | 1,045 | 452 | 57% |
| Array overhead | 167 MB | 72 MB | 57% |
---
docs/en/changes/changes.md | 1 +
.../analysis/worker/MetricsAggregateWorker.java | 12 +++++-
.../worker/MetricsPersistentMinWorker.java | 6 +++
.../server-library/library-batch-queue/CLAUDE.md | 18 +++++++++
.../oap/server/library/batchqueue/BatchQueue.java | 43 ++++++++++++++++++----
.../server/library/batchqueue/PartitionPolicy.java | 33 +++++++++++------
.../library/batchqueue/PartitionPolicyTest.java | 23 ++++++++++++
7 files changed, 115 insertions(+), 21 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 08992c1600..edc2abaca9 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,6 +12,7 @@
* Fix missing `and` keyword in `JDBCEBPFProfilingTaskDAO.getTaskRecord()` SQL
query, which caused a syntax error on every invocation.
* Fix duplicate `TABLE_COLUMN` condition in
`JDBCMetadataQueryDAO.findEndpoint()`, which was binding the same parameter
twice due to a copy-paste error.
* Support MCP (Model Context Protocol) observability for Envoy AI Gateway: MCP
metrics (request CPM/latency, method breakdown, backend breakdown,
initialization latency, capabilities), MCP access log sampling (errors only),
`ai_route_type` searchable log tag, and MCP dashboard tabs.
+* Add weighted handler support to `BatchQueue` adaptive partitioning. MAL
metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and
memory overhead when many MAL metric types are registered.
#### UI
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
index 0361e0711f..4af51a9cd1 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import
org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
+import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
@@ -121,7 +122,16 @@ public class MetricsAggregateWorker extends
AbstractWorker<Metrics> {
QUEUE_USAGE_GAUGE = gauge;
}
- l1Queue.addHandler(metricsClass, new L1Handler());
+ // OAL metrics receive items on every incoming request (high,
continuous throughput),
+ // so each type benefits from a dedicated partition — weight 1.0.
+ // MAL metrics receive items only once per scrape interval (typically
1 emit/min),
+ // producing at most ~500 items per type per burst. With a 20,000-slot
buffer,
+ // ~40 MAL types can safely share one partition (20,000 / 500 = 40).
We use
+ // weight 0.05 (≈ 1/20) to give 2x headroom over the theoretical
sharing limit.
+ // This significantly reduces partition count and memory overhead when
many MAL
+ // metric types are registered (e.g., from otel-rules).
+ final double weight = Meter.class.isAssignableFrom(metricsClass) ?
0.05 : 1.0;
+ l1Queue.addHandler(metricsClass, new L1Handler(), weight);
}
@Override
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
index 197cdab9fa..1e0725adb7 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java
@@ -118,6 +118,12 @@ public class MetricsPersistentMinWorker extends
MetricsPersistentWorker {
QUEUE_USAGE_GAUGE = gauge;
}
+ // No weight differentiation at L2. After L1 pre-aggregation, both OAL
and MAL
+ // produce one item per (metric_type × entity) per minute — similar
burst patterns
+ // and throughput. The OAL per-request amplification is absorbed by
L1, so at L2
+ // there is no meaningful throughput difference to justify partition
weight tuning.
+ // L2 buffer is also much smaller (2,000 vs 20,000), so the memory
overhead of
+ // extra partitions is modest (~16 MB total vs L1's ~167 MB).
l2Queue.addHandler(metricsClass, new L2Handler());
}
diff --git a/oap-server/server-library/library-batch-queue/CLAUDE.md
b/oap-server/server-library/library-batch-queue/CLAUDE.md
index 09e7278752..ad88ca6ba3 100644
--- a/oap-server/server-library/library-batch-queue/CLAUDE.md
+++ b/oap-server/server-library/library-batch-queue/CLAUDE.md
@@ -80,6 +80,24 @@ Adaptive growth (default multiplier 25, with 8 threads ->
threshold 200):
- 100 handlers -> 100 partitions (1:1)
- 500 handlers -> 350 partitions (200 + 300/2)
+### Weighted handlers
+
+`addHandler(type, handler, weight)` allows different handler types to
contribute different
+amounts to the partition count. The adaptive formula uses the weighted sum
instead of raw
+handler count. Partition assignment remains hash-based (`typeHash()`) — weight
only affects
+how many partitions exist, not which partition a type lands on.
+
+L1 uses weight 0.05 for MAL metrics (vs 1.0 for OAL). Rationale: MAL emits
~500 items/type
+per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share
one partition
+(20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom.
+
+Example (8 threads, 642 OAL + 1,247 MAL):
+- Without weight: 1,889 handlers -> 1,045 partitions (167 MB array overhead at
L1)
+- With weight: 642*1.0 + 1,247*0.05 = 705 effective -> 452 partitions (72 MB)
+
+L2 uses default weight 1.0 for all types because after L1 pre-aggregation both
OAL and MAL
+have similar per-minute burst patterns.
+
## Drain Rebalancing
Static round-robin partition assignment creates thread imbalance when metric
types have varying
diff --git
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
index 372d9fc2ed..597ca4d6ba 100644
---
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
+++
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java
@@ -128,6 +128,13 @@ public class BatchQueue<T> {
*/
private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap;
+ /**
+ * Running weighted sum of registered handlers, used by adaptive partition
policy.
+ * Each handler contributes its weight (default 1.0) when registered via
+ * {@link #addHandler(Class, HandlerConsumer, double)}.
+ */
+ private double weightedHandlerCount;
+
/**
* Tracks unregistered types that have already been warned about,
* to avoid flooding the log with repeated errors.
@@ -382,23 +389,43 @@ public class BatchQueue<T> {
}
/**
- * Register a type-based handler. Items whose {@code getClass()} matches
the given
- * type will be batched together and dispatched to this handler.
+ * Register a type-based handler with default weight 1.0.
+ *
+ * @param type the class of items to route to this handler
+ * @param handler the consumer that processes batches of the given type
+ * @see #addHandler(Class, HandlerConsumer, double)
+ */
+ public void addHandler(final Class<? extends T> type, final
HandlerConsumer<T> handler) {
+ addHandler(type, handler, 1.0);
+ }
+
+ /**
+ * Register a type-based handler with an explicit weight for adaptive
partition sizing.
*
- * <p>For adaptive partition policies, adding a handler recalculates the
partition
- * count and grows the partition array if needed. For non-adaptive
policies the
- * resolved count never changes, so this is a no-op beyond the
registration.
- * Drain loop threads pick up new partitions on their next cycle via
volatile reads.
+ * <p>The weight controls how much this handler contributes to partition
growth.
+ * A weight of 1.0 means one handler ≈ one partition (below the adaptive
threshold).
+ * A lower weight (e.g., 0.05) means many handlers share a partition,
suitable for
+ * low-throughput types. The weighted sum of all handlers replaces the raw
handler
+ * count in the adaptive partition formula.
+ *
+ * <p>For non-adaptive partition policies the weight is ignored and this
behaves
+ * the same as {@link #addHandler(Class, HandlerConsumer)}.
*
* @param type the class of items to route to this handler
* @param handler the consumer that processes batches of the given type
+ * @param weight partition weight for this handler (default 1.0). Must be
> 0.
*/
@SuppressWarnings("unchecked")
- public void addHandler(final Class<? extends T> type, final
HandlerConsumer<T> handler) {
+ public void addHandler(final Class<? extends T> type, final
HandlerConsumer<T> handler,
+ final double weight) {
+ if (weight <= 0) {
+ throw new IllegalArgumentException("Handler weight must be > 0,
got: " + weight);
+ }
handlerMap.put(type, handler);
+ weightedHandlerCount += weight;
final int newPartitionCount = config.getPartitions()
- .resolve(resolvedThreadCount, handlerMap.size());
+ .resolve(resolvedThreadCount, weightedHandlerCount);
final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
if (newPartitionCount > currentPartitions.length) {
final int oldCount = currentPartitions.length;
diff --git
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
index 8ec0a0c4cf..8f916c2c2b 100644
---
a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
+++
b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java
@@ -31,8 +31,8 @@ package org.apache.skywalking.oap.server.library.batchqueue;
* excess handlers share partitions at 1:2 ratio.</li>
* </ul>
*
- * <p>All policies are resolved via {@link #resolve(int, int)}. For
non-adaptive
- * policies the handlerCount parameter is ignored. At queue creation time, if
the
+ * <p>All policies are resolved via {@link #resolve(int, double)}. For
non-adaptive
+ * policies the weightedHandlerCount parameter is ignored. At queue creation
time, if the
* resolved partition count is less than the thread count, the thread count is
* reduced to match and a warning is logged.
*/
@@ -130,29 +130,38 @@ public class PartitionPolicy {
* <ul>
* <li>fixed: returns the pre-set count (both parameters ignored).</li>
* <li>threadMultiply: returns multiplier * resolvedThreadCount
(handlerCount ignored).</li>
- * <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as
a sensible
- * initial count. Otherwise, threshold = threadCount * multiplier;
if handlerCount
- * <= threshold, returns handlerCount (1:1). If above, returns
- * threshold + (handlerCount - threshold) / 2.</li>
+ * <li>adaptive: when weightedHandlerCount is 0, returns
resolvedThreadCount as a sensible
+ * initial count. Otherwise, threshold = threadCount * multiplier;
if weightedHandlerCount
+ * <= threshold, returns weightedHandlerCount (1:1). If above,
returns
+ * threshold + (weightedHandlerCount - threshold) / 2.</li>
* </ul>
*
* @param resolvedThreadCount the resolved number of drain threads
- * @param handlerCount the current number of registered type handlers
+ * @param weightedHandlerCount the weighted sum of registered type
handlers. Each handler
+ * contributes its weight (default 1.0) to
this sum.
+ * High-weight handlers grow the partition
count faster,
+ * reducing the chance of hash collisions for
those types.
+ * Low-weight handlers grow the count slowly,
so they are
+ * more likely to share partitions with other
types via
+ * {@code typeHash()} routing. Note that
partition assignment
+ * is hash-based, not weight-based — there is
no guarantee
+ * that any type gets a dedicated partition.
* @return the resolved partition count, always >= 1
*/
- public int resolve(final int resolvedThreadCount, final int handlerCount) {
+ public int resolve(final int resolvedThreadCount, final double
weightedHandlerCount) {
if (fixedCount > 0) {
return fixedCount;
}
if (adaptive) {
- if (handlerCount == 0) {
+ final int effectiveCount = (int) Math.ceil(weightedHandlerCount);
+ if (effectiveCount == 0) {
return Math.max(1, resolvedThreadCount);
}
final int threshold = Math.max(1, multiplier *
resolvedThreadCount);
- if (handlerCount <= threshold) {
- return handlerCount;
+ if (effectiveCount <= threshold) {
+ return effectiveCount;
}
- return threshold + (handlerCount - threshold) / 2;
+ return threshold + (effectiveCount - threshold) / 2;
}
return Math.max(1, multiplier * resolvedThreadCount);
}
diff --git
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
index 9cdb5d883b..0dab65854b 100644
---
a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
+++
b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java
@@ -109,6 +109,29 @@ public class PartitionPolicyTest {
() -> PartitionPolicy.adaptive(0));
}
+ @Test
+ public void testAdaptiveWithWeightedHandlers() {
+ // Simulate 642 OAL (weight 1.0) + 1247 MAL (weight 0.05)
+ // Weighted count = 642 + 1247 * 0.05 = 642 + 62.35 = 704.35, ceil =
705
+ // 8 threads * 25 = 200 threshold, 705 > 200
+ // Result = 200 + (705 - 200) / 2 = 200 + 252 = 452
+ final double weightedCount = 642 * 1.0 + 1247 * 0.05;
+ assertEquals(452, PartitionPolicy.adaptive().resolve(8,
weightedCount));
+ }
+
+ @Test
+ public void testAdaptiveWithLowWeightOnly() {
+ // 100 MAL-only handlers at weight 0.05 = effective 5, ceil = 5
+ // 8 threads, threshold = 200, 5 < 200 -> 1:1 -> 5 partitions
+ assertEquals(5, PartitionPolicy.adaptive().resolve(8, 100 * 0.05));
+ }
+
+ @Test
+ public void testAdaptiveWithZeroWeightedCount() {
+ // weightedCount = 0.0 should return threadCount
+ assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0.0));
+ }
+
@Test
public void testToString() {
assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());