This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6f48ce8322e6e46df107564c7788e25bcd3abde0 Author: chuxiao <chux...@didichuxing.com> AuthorDate: Wed Jul 22 23:53:35 2020 +0800 KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable (cherry picked from commit 8e44f573be5905fe85f10241f0f7a7844e48f5f0) --- .../kylin/metrics/lib/impl/BlockingReservoir.java | 27 +++++++++++++++++++--- server/src/main/resources/kylinMetrics.xml | 7 +++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java index 6158096..afa34a9 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.kylin.metrics.lib.ActiveReservoirListener; import org.apache.kylin.metrics.lib.Record; import org.slf4j.Logger; @@ -58,11 +60,19 @@ public class BlockingReservoir extends AbstractActiveReservoir { } public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime) { + this(minReportSize, maxReportSize, maxReportSize, MAX_QUEUE_SIZE); + } + + public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime, int maxQueueSize) { + Preconditions.checkArgument(minReportSize > 0, "minReportSize should be larger than 0"); + Preconditions.checkArgument(maxReportSize >= minReportSize, + "maxReportSize should not be less than minBatchSize"); + Preconditions.checkArgument(maxReportTime > 0, "maxReportTime should be larger than 0"); this.minReportSize = minReportSize; this.maxReportSize = maxReportSize; this.maxReportTime = maxReportTime * 60 * 1000L; - this.recordsQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); + this.recordsQueue = new LinkedBlockingQueue<>(maxQueueSize); this.listeners = Lists.newArrayList(); this.records = Lists.newArrayListWithExpectedSize(this.maxReportSize); @@ -95,9 +105,11 @@ public class BlockingReservoir extends AbstractActiveReservoir { if (ifAll) { records = Lists.newArrayList(); recordsQueue.drainTo(records); + logger.info("Will report {} metrics records", records.size()); } else { records.clear(); recordsQueue.drainTo(records, maxReportSize); + logger.info("Will report {} metrics records, remaining {} records", records.size(), size()); } boolean ifSucceed = true; @@ -127,9 +139,18 @@ public class BlockingReservoir extends AbstractActiveReservoir { return true; } - @Override - public void start() { + @VisibleForTesting + void notifyUpdate() { + onRecordUpdate(false); + } + + @VisibleForTesting + void setReady() { super.start(); + } + + public void start() { + setReady(); scheduledReporter.start(); } diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml index 85c879f..a9d907a 100644 --- a/server/src/main/resources/kylinMetrics.xml +++ b/server/src/main/resources/kylinMetrics.xml @@ -34,10 +34,15 @@ <value>500</value> </constructor-arg> - <!-- minReportTime, min duration(in minute) between two report action--> + <!-- maxReportTime, max duration(in minute) between two report action--> <constructor-arg index="2"> <value>10</value> </constructor-arg> + + <!-- maxQueueSize, max queue size of LinkedBlockingQueue--> + <constructor-arg index="3"> + <value>50000</value> + </constructor-arg> </bean> <bean id="hiveSink" class="org.apache.kylin.metrics.lib.impl.hive.HiveSink"/>