This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6f48ce8322e6e46df107564c7788e25bcd3abde0
Author: chuxiao <chux...@didichuxing.com>
AuthorDate: Wed Jul 22 23:53:35 2020 +0800

    KYLIN-4653 Make the capacity for the LinkedBlockingQueue of 
BlockingReservoir configurable
    
    (cherry picked from commit 8e44f573be5905fe85f10241f0f7a7844e48f5f0)
---
 .../kylin/metrics/lib/impl/BlockingReservoir.java  | 27 +++++++++++++++++++---
 server/src/main/resources/kylinMetrics.xml         |  7 +++++-
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
index 6158096..afa34a9 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.kylin.metrics.lib.ActiveReservoirListener;
 import org.apache.kylin.metrics.lib.Record;
 import org.slf4j.Logger;
@@ -58,11 +60,19 @@ public class BlockingReservoir extends 
AbstractActiveReservoir {
     }
 
     public BlockingReservoir(int minReportSize, int maxReportSize, int 
maxReportTime) {
+        this(minReportSize, maxReportSize, maxReportSize, MAX_QUEUE_SIZE);
+    }
+
+    public BlockingReservoir(int minReportSize, int maxReportSize, int 
maxReportTime, int maxQueueSize) {
+        Preconditions.checkArgument(minReportSize > 0, "minReportSize should 
be larger than 0");
+        Preconditions.checkArgument(maxReportSize >= minReportSize,
+                "maxReportSize should not be less than minBatchSize");
+        Preconditions.checkArgument(maxReportTime > 0, "maxReportTime should 
be larger than 0");
         this.minReportSize = minReportSize;
         this.maxReportSize = maxReportSize;
         this.maxReportTime = maxReportTime * 60 * 1000L;
 
-        this.recordsQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
+        this.recordsQueue = new LinkedBlockingQueue<>(maxQueueSize);
         this.listeners = Lists.newArrayList();
 
         this.records = Lists.newArrayListWithExpectedSize(this.maxReportSize);
@@ -95,9 +105,11 @@ public class BlockingReservoir extends 
AbstractActiveReservoir {
         if (ifAll) {
             records = Lists.newArrayList();
             recordsQueue.drainTo(records);
+            logger.info("Will report {} metrics records", records.size());
         } else {
             records.clear();
             recordsQueue.drainTo(records, maxReportSize);
+            logger.info("Will report {} metrics records, remaining {} 
records", records.size(), size());
         }
 
         boolean ifSucceed = true;
@@ -127,9 +139,18 @@ public class BlockingReservoir extends 
AbstractActiveReservoir {
         return true;
     }
 
-    @Override
-    public void start() {
+    @VisibleForTesting
+    void notifyUpdate() {
+        onRecordUpdate(false);
+    }
+
+    @VisibleForTesting
+    void setReady() {
         super.start();
+    }
+
+    public void start() {
+        setReady();
         scheduledReporter.start();
     }
 
diff --git a/server/src/main/resources/kylinMetrics.xml 
b/server/src/main/resources/kylinMetrics.xml
index 85c879f..a9d907a 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -34,10 +34,15 @@
             <value>500</value>
         </constructor-arg>
 
-        <!-- minReportTime, min duration(in minute) between two report 
action-->
+        <!-- maxReportTime, max duration(in minute) between two report 
action-->
         <constructor-arg index="2">
             <value>10</value>
         </constructor-arg>
+
+        <!-- maxQueueSize, max queue size of LinkedBlockingQueue-->
+        <constructor-arg index="3">
+            <value>50000</value>
+        </constructor-arg>
     </bean>
 
     <bean id="hiveSink" 
class="org.apache.kylin.metrics.lib.impl.hive.HiveSink"/>

Reply via email to