This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch feature/batch-queue-weighted-partitions in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 3c3757ed970cd5caa13ed41824c9330a7094a561 Author: Wu Sheng <[email protected]> AuthorDate: Tue Apr 7 08:54:21 2026 +0800 Add weighted handler support to BatchQueue adaptive partitioning MAL metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and memory overhead when many MAL metric types are registered. --- docs/en/changes/changes.md | 1 + .../analysis/worker/MetricsAggregateWorker.java | 12 +++++- .../worker/MetricsPersistentMinWorker.java | 6 +++ .../server-library/library-batch-queue/CLAUDE.md | 18 +++++++++ .../oap/server/library/batchqueue/BatchQueue.java | 43 ++++++++++++++++++---- .../server/library/batchqueue/PartitionPolicy.java | 33 +++++++++++------ .../library/batchqueue/PartitionPolicyTest.java | 23 ++++++++++++ 7 files changed, 115 insertions(+), 21 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 08992c1600..edc2abaca9 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -12,6 +12,7 @@ * Fix missing `and` keyword in `JDBCEBPFProfilingTaskDAO.getTaskRecord()` SQL query, which caused a syntax error on every invocation. * Fix duplicate `TABLE_COLUMN` condition in `JDBCMetadataQueryDAO.findEndpoint()`, which was binding the same parameter twice due to a copy-paste error. * Support MCP (Model Context Protocol) observability for Envoy AI Gateway: MCP metrics (request CPM/latency, method breakdown, backend breakdown, initialization latency, capabilities), MCP access log sampling (errors only), `ai_route_type` searchable log tag, and MCP dashboard tabs. +* Add weighted handler support to `BatchQueue` adaptive partitioning. MAL metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and memory overhead when many MAL metric types are registered. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 0361e0711f..4af51a9cd1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; +import org.apache.skywalking.oap.server.core.analysis.meter.Meter; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue; @@ -121,7 +122,16 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> { QUEUE_USAGE_GAUGE = gauge; } - l1Queue.addHandler(metricsClass, new L1Handler()); + // OAL metrics receive items on every incoming request (high, continuous throughput), + // so each type benefits from a dedicated partition — weight 1.0. + // MAL metrics receive items only once per scrape interval (typically 1 emit/min), + // producing at most ~500 items per type per burst. With a 20,000-slot buffer, + // ~40 MAL types can safely share one partition (20,000 / 500 = 40). We use + // weight 0.05 (≈ 1/20) to give 2x headroom over the theoretical sharing limit. + // This significantly reduces partition count and memory overhead when many MAL + // metric types are registered (e.g., from otel-rules). + final double weight = Meter.class.isAssignableFrom(metricsClass) ? 0.05 : 1.0; + l1Queue.addHandler(metricsClass, new L1Handler(), weight); } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java index 197cdab9fa..1e0725adb7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java @@ -118,6 +118,12 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker { QUEUE_USAGE_GAUGE = gauge; } + // No weight differentiation at L2. After L1 pre-aggregation, both OAL and MAL + // produce one item per (metric_type × entity) per minute — similar burst patterns + // and throughput. The OAL per-request amplification is absorbed by L1, so at L2 + // there is no meaningful throughput difference to justify partition weight tuning. + // L2 buffer is also much smaller (2,000 vs 20,000), so the memory overhead of + // extra partitions is modest (~16 MB total vs L1's ~167 MB). l2Queue.addHandler(metricsClass, new L2Handler()); } diff --git a/oap-server/server-library/library-batch-queue/CLAUDE.md b/oap-server/server-library/library-batch-queue/CLAUDE.md index 09e7278752..ad88ca6ba3 100644 --- a/oap-server/server-library/library-batch-queue/CLAUDE.md +++ b/oap-server/server-library/library-batch-queue/CLAUDE.md @@ -80,6 +80,24 @@ Adaptive growth (default multiplier 25, with 8 threads -> threshold 200): - 100 handlers -> 100 partitions (1:1) - 500 handlers -> 350 partitions (200 + 300/2) +### Weighted handlers + +`addHandler(type, handler, weight)` allows different handler types to contribute different +amounts to the partition count. The adaptive formula uses the weighted sum instead of raw +handler count. Partition assignment remains hash-based (`typeHash()`) — weight only affects +how many partitions exist, not which partition a type lands on. + +L1 uses weight 0.05 for MAL metrics (vs 1.0 for OAL). Rationale: MAL emits ~500 items/type +per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share one partition +(20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom. + +Example (8 threads, 642 OAL + 1,247 MAL): +- Without weight: 1,889 handlers -> 1,045 partitions (167 MB array overhead at L1) +- With weight: 642*1.0 + 1,247*0.05 = 705 effective -> 452 partitions (72 MB) + +L2 uses default weight 1.0 for all types because after L1 pre-aggregation both OAL and MAL +have similar per-minute burst patterns. + ## Drain Rebalancing Static round-robin partition assignment creates thread imbalance when metric types have varying diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java index 372d9fc2ed..597ca4d6ba 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java @@ -128,6 +128,13 @@ public class BatchQueue<T> { */ private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap; + /** + * Running weighted sum of registered handlers, used by adaptive partition policy. + * Each handler contributes its weight (default 1.0) when registered via + * {@link #addHandler(Class, HandlerConsumer, double)}. + */ + private double weightedHandlerCount; + /** * Tracks unregistered types that have already been warned about, * to avoid flooding the log with repeated errors. @@ -382,23 +389,43 @@ public class BatchQueue<T> { } /** - * Register a type-based handler. Items whose {@code getClass()} matches the given - * type will be batched together and dispatched to this handler. + * Register a type-based handler with default weight 1.0. + * + * @param type the class of items to route to this handler + * @param handler the consumer that processes batches of the given type + * @see #addHandler(Class, HandlerConsumer, double) + */ + public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) { + addHandler(type, handler, 1.0); + } + + /** + * Register a type-based handler with an explicit weight for adaptive partition sizing. * - * <p>For adaptive partition policies, adding a handler recalculates the partition - * count and grows the partition array if needed. For non-adaptive policies the - * resolved count never changes, so this is a no-op beyond the registration. - * Drain loop threads pick up new partitions on their next cycle via volatile reads. + * <p>The weight controls how much this handler contributes to partition growth. + * A weight of 1.0 means one handler ≈ one partition (below the adaptive threshold). + * A lower weight (e.g., 0.05) means many handlers share a partition, suitable for + * low-throughput types. The weighted sum of all handlers replaces the raw handler + * count in the adaptive partition formula. + * + * <p>For non-adaptive partition policies the weight is ignored and this behaves + * the same as {@link #addHandler(Class, HandlerConsumer)}. * * @param type the class of items to route to this handler * @param handler the consumer that processes batches of the given type + * @param weight partition weight for this handler (default 1.0). Must be > 0. */ @SuppressWarnings("unchecked") - public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) { + public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler, + final double weight) { + if (weight <= 0) { + throw new IllegalArgumentException("Handler weight must be > 0, got: " + weight); + } handlerMap.put(type, handler); + weightedHandlerCount += weight; final int newPartitionCount = config.getPartitions() - .resolve(resolvedThreadCount, handlerMap.size()); + .resolve(resolvedThreadCount, weightedHandlerCount); final ArrayBlockingQueue<T>[] currentPartitions = this.partitions; if (newPartitionCount > currentPartitions.length) { final int oldCount = currentPartitions.length; diff --git a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java index 8ec0a0c4cf..8f916c2c2b 100644 --- a/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java +++ b/oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java @@ -31,8 +31,8 @@ package org.apache.skywalking.oap.server.library.batchqueue; * excess handlers share partitions at 1:2 ratio.</li> * </ul> * - * <p>All policies are resolved via {@link #resolve(int, int)}. For non-adaptive - * policies the handlerCount parameter is ignored. At queue creation time, if the + * <p>All policies are resolved via {@link #resolve(int, double)}. For non-adaptive + * policies the weightedHandlerCount parameter is ignored. At queue creation time, if the * resolved partition count is less than the thread count, the thread count is * reduced to match and a warning is logged. */ @@ -130,29 +130,38 @@ public class PartitionPolicy { * <ul> * <li>fixed: returns the pre-set count (both parameters ignored).</li> * <li>threadMultiply: returns multiplier * resolvedThreadCount (handlerCount ignored).</li> - * <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as a sensible - * initial count. Otherwise, threshold = threadCount * multiplier; if handlerCount - * <= threshold, returns handlerCount (1:1). If above, returns - * threshold + (handlerCount - threshold) / 2.</li> + * <li>adaptive: when weightedHandlerCount is 0, returns resolvedThreadCount as a sensible + * initial count. Otherwise, threshold = threadCount * multiplier; if weightedHandlerCount + * <= threshold, returns weightedHandlerCount (1:1). If above, returns + * threshold + (weightedHandlerCount - threshold) / 2.</li> * </ul> * * @param resolvedThreadCount the resolved number of drain threads - * @param handlerCount the current number of registered type handlers + * @param weightedHandlerCount the weighted sum of registered type handlers. Each handler + * contributes its weight (default 1.0) to this sum. + * High-weight handlers grow the partition count faster, + * reducing the chance of hash collisions for those types. + * Low-weight handlers grow the count slowly, so they are + * more likely to share partitions with other types via + * {@code typeHash()} routing. Note that partition assignment + * is hash-based, not weight-based — there is no guarantee + * that any type gets a dedicated partition. * @return the resolved partition count, always >= 1 */ - public int resolve(final int resolvedThreadCount, final int handlerCount) { + public int resolve(final int resolvedThreadCount, final double weightedHandlerCount) { if (fixedCount > 0) { return fixedCount; } if (adaptive) { - if (handlerCount == 0) { + final int effectiveCount = (int) Math.ceil(weightedHandlerCount); + if (effectiveCount == 0) { return Math.max(1, resolvedThreadCount); } final int threshold = Math.max(1, multiplier * resolvedThreadCount); - if (handlerCount <= threshold) { - return handlerCount; + if (effectiveCount <= threshold) { + return effectiveCount; } - return threshold + (handlerCount - threshold) / 2; + return threshold + (effectiveCount - threshold) / 2; } return Math.max(1, multiplier * resolvedThreadCount); } diff --git a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java index 9cdb5d883b..0dab65854b 100644 --- a/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java +++ b/oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java @@ -109,6 +109,29 @@ public class PartitionPolicyTest { () -> PartitionPolicy.adaptive(0)); } + @Test + public void testAdaptiveWithWeightedHandlers() { + // Simulate 642 OAL (weight 1.0) + 1247 MAL (weight 0.05) + // Weighted count = 642 + 1247 * 0.05 = 642 + 62.35 = 704.35, ceil = 705 + // 8 threads * 25 = 200 threshold, 705 > 200 + // Result = 200 + (705 - 200) / 2 = 200 + 252 = 452 + final double weightedCount = 642 * 1.0 + 1247 * 0.05; + assertEquals(452, PartitionPolicy.adaptive().resolve(8, weightedCount)); + } + + @Test + public void testAdaptiveWithLowWeightOnly() { + // 100 MAL-only handlers at weight 0.05 = effective 5, ceil = 5 + // 8 threads, threshold = 200, 5 < 200 -> 1:1 -> 5 partitions + assertEquals(5, PartitionPolicy.adaptive().resolve(8, 100 * 0.05)); + } + + @Test + public void testAdaptiveWithZeroWeightedCount() { + // weightedCount = 0.0 should return threadCount + assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0.0)); + } + @Test public void testToString() { assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());
