This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 540853d  Improvements to RealtimeProvisioningHelper command (#5737)
540853d is described below

commit 540853d786c9e4b4bb27bf4bbc6fa73837ff1bc3
Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
AuthorDate: Fri Jul 24 20:18:37 2020 -0700

    Improvements to RealtimeProvisioningHelper command (#5737)
    
    * Improvements to RealtimepRovisioningHelper command
    
    - Fixed a bug where we were looking for push frequency setting in the table 
config
      Push frequency is not usually set in realtime table, so changed that to 
be an
      optional argument instead.
    - Instead of push frequency, the user may specify retentionHours, meaning 
the number
      of hours of most recent data that will be queried most often.
    - Added the amount of total mapped memory to the output
    - Added the number of segments queried per host to the output.
    - Added a link to the documentation to the output.
    - Instead of accepting the time taken to consume the segment, introduced an 
argument
      to accept the ingestion rate per partition. This is determined easier 
when looking
      at metrics for the topic that we need to ingest, and is more convenient 
when
      an offline segment is being provided as a sample
    - Added the command line arguments printed with the output so that it is 
easy for us
      to debug when we get questions from the community.
    
    TODO Next:
    
    Can we compute a score and just recommend a configuration? We know we want 
to minimize
    num segments scanned per host, num hosts, segment size (to fit memory).
    
    Issue #5588
    
    Here is a sample output:
    ============================================================
    RealtimeProvisioningHelperCommand -tableConfigFile 
/Users/ssubrama/tmp/samza/realtimeTableConfig.json -numPartitions 16 
-pushFrequency null -numHosts 8,6,10 -numHours 6,12,18,24 
-sampleCompletedSegmentDir 
/Users/ssubrama/tmp/samza/TestSamzaAnalyticsFeatures_1593411480000_1593500340000_0/
 -ingestionRate 100 -maxUsableHostMemory 48G -retentionHours 72
    
    Note:
    
    * Table retention and push frequency ignored for determining retentionHours
    * See 
https://docs.pinot.apache.org/operators/operating-pinot/tuning/realtime
    
    Memory used per host (Active/Mapped)
    
    numHosts --> 6               |8               |10              |
    numHours
     6 --------> 5.05G/19.49G    |3.37G/12.99G    |3.37G/12.99G    |
    12 --------> 5.89G/20.33G    |3.93G/13.55G    |3.93G/13.55G    |
    18 --------> 6.73G/21.49G    |4.48G/14.33G    |4.48G/14.33G    |
    24 --------> 7.56G/22G       |5.04G/14.66G    |5.04G/14.66G    |
    
    Optimal segment size
    
    numHosts --> 6               |8               |10              |
    numHours
     6 --------> 111.98M         |111.98M         |111.98M         |
    12 --------> 223.96M         |223.96M         |223.96M         |
    18 --------> 335.94M         |335.94M         |335.94M         |
    24 --------> 447.92M         |447.92M         |447.92M         |
    
    Consuming memory
    
    numHosts --> 6               |8               |10              |
    numHours
     6 --------> 1.45G           |987.17M         |987.17M         |
    12 --------> 2.61G           |1.74G           |1.74G           |
    18 --------> 3.77G           |2.52G           |2.52G           |
    24 --------> 4.94G           |3.29G           |3.29G           |
    
    Number of segments queried per host
    
    numHosts --> 6               |8               |10              |
    numHours
     6 --------> 12              |12              |12              |
    12 --------> 6               |6               |6               |
    18 --------> 4               |4               |4               |
    24 --------> 3               |3               |3               |
    
    * Addressed review comments
---
 .../routing/timeboundary/TimeBoundaryManager.java  |   2 +-
 .../apache/pinot/common/utils/CommonConstants.java |   7 +
 .../controller/util/SegmentIntervalUtils.java      |   9 +-
 .../name/NormalizedDateSegmentNameGenerator.java   |   3 +-
 .../command/RealtimeProvisioningHelperCommand.java | 100 ++++++++++----
 .../realtime/provisioning/MemoryEstimator.java     | 144 ++++++++++++---------
 6 files changed, 168 insertions(+), 97 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 5e4b019..7462913 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -82,7 +82,7 @@ public class TimeBoundaryManager {
 
     // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 
HOUR) as the time boundary; otherwise, use
     // (maxEndTime - 1 DAY)
-    _isHourlyTable = 
"HOURLY".equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
+    _isHourlyTable = 
CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
         && _timeUnit != TimeUnit.DAYS;
 
     LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, 
timeUnit: {}, isHourlyTable: {} for table: {}",
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 3e25512..85113e7 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -27,6 +27,13 @@ public class CommonConstants {
   public static final String HTTP_PROTOCOL = "http";
   public static final String HTTPS_PROTOCOL = "https";
 
+  public static class Table {
+    public static final String PUSH_FREQUENCY_HOURLY = "hourly";
+    public static final String PUSH_FREQUENCY_DAILY = "daily";
+    public static final String PUSH_FREQUENCY_WEEKLY = "weekly";
+    public static final String PUSH_FREQUENCY_MONTHLY = "monthly";
+  }
+
   public static class Helix {
     public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress";
     public static final String QUERIES_DISABLED = "queriesDisabled";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java
index e08246e..cdd3291 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.util;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.joda.time.Duration;
 
@@ -49,16 +50,16 @@ public class SegmentIntervalUtils {
    * Converts push frequency into duration. For invalid or less than 'hourly' 
push frequency, treats it as 'daily'.
    */
   public static Duration convertToDuration(String pushFrequency) {
-    if ("hourly".equalsIgnoreCase(pushFrequency)) {
+    if 
(CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(pushFrequency)) {
       return Duration.standardHours(1L);
     }
-    if ("daily".equalsIgnoreCase(pushFrequency)) {
+    if 
(CommonConstants.Table.PUSH_FREQUENCY_DAILY.equalsIgnoreCase(pushFrequency)) {
       return Duration.standardDays(1L);
     }
-    if ("weekly".equalsIgnoreCase(pushFrequency)) {
+    if 
(CommonConstants.Table.PUSH_FREQUENCY_WEEKLY.equalsIgnoreCase(pushFrequency)) {
       return Duration.standardDays(7L);
     }
-    if ("monthly".equalsIgnoreCase(pushFrequency)) {
+    if 
(CommonConstants.Table.PUSH_FREQUENCY_MONTHLY.equalsIgnoreCase(pushFrequency)) {
       return Duration.standardDays(30L);
     }
     return Duration.standardDays(1L);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index 75b619d..033e5c5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -25,6 +25,7 @@ import java.util.Date;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.data.DateTimeFieldSpec.TimeFormat;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 
@@ -54,7 +55,7 @@ public class NormalizedDateSegmentNameGenerator implements 
SegmentNameGenerator
     // Include time info for APPEND push type
     if (_appendPushType) {
       // For HOURLY push frequency, include hours into output format
-      if ("HOURLY".equalsIgnoreCase(pushFrequency)) {
+      if 
(CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(pushFrequency)) {
         _outputSDF = new SimpleDateFormat("yyyy-MM-dd-HH");
       } else {
         _outputSDF = new SimpleDateFormat("yyyy-MM-dd");
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
index 1976443..7c145b1 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.tools.Command;
 import org.apache.pinot.tools.realtime.provisioning.MemoryEstimator;
 import org.kohsuke.args4j.Option;
@@ -44,10 +43,12 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeProvisioningHelperCommand.class);
 
-  private static final int MEMORY_STR_LEN = 9;
+  private static final int MEMORY_STR_LEN = 16;
   private static final String COMMA_SEPARATOR = ",";
   private static final int DEFAULT_RETENTION_FOR_HOURLY_PUSH = 24;
   private static final int DEFAULT_RETENTION_FOR_DAILY_PUSH = 72;
+  private static final int DEFAULT_RETENTION_FOR_WEEKLY_PUSH = 24*7 + 72;
+  private static final int DEFAULT_RETENTION_FOR_MONTHLY_PUSH = 24*31 + 72;
 
   @Option(name = "-tableConfigFile", required = true, metaVar = "<String>")
   private String _tableConfigFile;
@@ -55,13 +56,15 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
   @Option(name = "-numPartitions", required = true, metaVar = "<int>", usage = 
"number of stream partitions for the table")
   private int _numPartitions;
 
-  @Option(name = "-retentionHours", metaVar = "<int>", usage =
-      "Number of hours the segments will need to be retained in memory. "
-          + "\nThe realtime segments will need to be in memory only until the 
offline segments are available and used for queries"
-          + "\nThis will be picked from the table config  by looking at the 
segmentPushFrequency (72h if daily, 24h if hourly, buffer added as 
TimeBoundaryService doesn't query the last offline timestamp), "
-          + "\nIt can be overridden using this option")
+  @Option(name = "-retentionHours", metaVar = "<int>", usage = "Number of 
recent hours queried most often"
+      + "\n\t(-pushFrequency is ignored)")
   private int _retentionHours;
 
+  @Option(name = "-pushFrequency", metaVar = "<String>", usage =
+      "Frequency with which offline table pushes happen, if this is a hybrid 
table"
+          + "\n\t(hourly,daily,weekly,monthly). Do not specify if 
realtime-only table")
+  private String _pushFrequency;
+
   @Option(name = "-numHosts", metaVar = "<String>", usage = "number of hosts 
as comma separated values (default 2,4,6,8,10,12,14,16)")
   private String _numHosts = "2,4,6,8,10,12,14,16";
 
@@ -71,8 +74,8 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
   @Option(name = "-sampleCompletedSegmentDir", required = true, metaVar = 
"<String>", usage = "Consume from the topic for n hours and provide the path of 
the segment dir after it completes")
   private String _sampleCompletedSegmentDir;
 
-  @Option(name = "-periodSampleSegmentConsumed", required = true, metaVar = 
"<String>", usage = "Period for which the sample segment was consuming in 
format 4h, 5h30m, 40m etc")
-  private String _periodSampleSegmentConsumed;
+  @Option(name = "-ingestionRate", required = true, metaVar = "<String>", 
usage = "Avg number of messages per second ingested on any one stream partition 
(assumed all partitions are uniform)")
+  private int _ingestionRate;
 
   @Option(name = "-maxUsableHostMemory", required = false, metaVar = 
"<String>", usage = "Maximum memory per host that can be used for pinot data 
(e.g. 250G, 100M). Default 48g")
   private String _maxUsableHostMemory = "48G";
@@ -90,6 +93,11 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
     return this;
   }
 
+  public RealtimeProvisioningHelperCommand setPushFrequency(String 
pushFrequency) {
+    _pushFrequency = pushFrequency;
+    return this;
+  }
+
   public RealtimeProvisioningHelperCommand setRetentionHours(int 
retentionHours) {
     _retentionHours = retentionHours;
     return this;
@@ -115,17 +123,17 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
     return this;
   }
 
-  public RealtimeProvisioningHelperCommand 
setPeriodSampleSegmentConsumed(String periodSampleSegmentConsumed) {
-    _periodSampleSegmentConsumed = periodSampleSegmentConsumed;
+  public RealtimeProvisioningHelperCommand setIngestionRate(int ingestionRate) 
{
+    _ingestionRate = ingestionRate;
     return this;
   }
 
   @Override
   public String toString() {
     return ("RealtimeProvisioningHelperCommand -tableConfigFile " + 
_tableConfigFile + " -numPartitions "
-        + _numPartitions + " -retentionHours " + _retentionHours + " -numHosts 
" + _numHosts + " -numHours " + _numHours
-        + " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir + " 
-periodSampleSegmentConsumed "
-        + _periodSampleSegmentConsumed + "-maxUsableMemory " + 
_maxUsableHostMemory);
+        + _numPartitions + " -pushFrequency " + _pushFrequency + " -numHosts " 
+ _numHosts + " -numHours " + _numHours
+        + " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir + " 
-ingestionRate "
+        + _ingestionRate + " -maxUsableHostMemory " + _maxUsableHostMemory + " 
-retentionHours " + _retentionHours);
   }
 
   @Override
@@ -146,6 +154,20 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
   }
 
   @Override
+  public void printExamples() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("\n\nThis command allows you to estimate the capacity 
needed for provisioning realtime hosts")
+        .append("It assumes that there is no upper limit to the amount of 
memory you can mmap")
+        .append("\nIf you have a hybrid table, then consult the push frequency 
setting in your offline table specify it in the -pushFrequency argument")
+        .append("\nIf you have a realtime-only table, then the default 
behavior is to assume that your queries need all data in memory all the time")
+        .append("\nHowever, if most of your queries are going to be for (say) 
the last 96 hours, then you can specify that in -retentionHours")
+        .append("\nDoing so will let this program assume that you are willing 
to take a page hit when querying older data")
+        .append("\nand optimize memory and number of hosts accordingly.")
+        ;
+    System.out.println(builder.toString());
+  }
+
+  @Override
   public boolean execute()
       throws IOException {
     LOGGER.info("Executing command: {}", toString());
@@ -158,17 +180,36 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
       throw new RuntimeException("Exception in reading table config from file 
" + _tableConfigFile, e);
     }
 
+    StringBuilder note = new StringBuilder();
+    note.append("\nNote:\n");
     int numReplicas = 
tableConfig.getValidationConfig().getReplicasPerPartitionNumber();
-    if (_retentionHours == 0) {
-      if 
(tableConfig.getValidationConfig().getSegmentPushFrequency().equalsIgnoreCase("hourly"))
 {
-        _retentionHours = DEFAULT_RETENTION_FOR_HOURLY_PUSH;
+    int tableRetentionHours = (int) 
TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit())
+            
.toHours(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    if (_retentionHours > 0) {
+      note.append("\n* Table retention and push frequency ignored for 
determining retentionHours since it is specified in command");
+    } else {
+      if (_pushFrequency == null) {
+        // This is a realtime-only table. Pick up the retention time
+        _retentionHours = tableRetentionHours;
+        note.append("\n* Retention hours taken from tableConfig");
       } else {
-        _retentionHours = DEFAULT_RETENTION_FOR_DAILY_PUSH;
+        if ("hourly".equalsIgnoreCase(_pushFrequency)) {
+          _retentionHours = DEFAULT_RETENTION_FOR_HOURLY_PUSH;
+        } else if ("daily".equalsIgnoreCase(_pushFrequency)) {
+          _retentionHours = DEFAULT_RETENTION_FOR_DAILY_PUSH;
+        } else if ("weekly".equalsIgnoreCase(_pushFrequency)) {
+          _retentionHours = DEFAULT_RETENTION_FOR_WEEKLY_PUSH;
+        } else if ("monthly".equalsIgnoreCase(_pushFrequency)) {
+          _retentionHours = DEFAULT_RETENTION_FOR_MONTHLY_PUSH;
+        } else {
+          throw new IllegalArgumentException("Illegal value for pushFrequency: 
'" + _pushFrequency + "'");
+        }
+        note.append("\n* Retention hours taken from pushFrequency");
       }
     }
 
-    int[] numHosts = 
Arrays.stream(_numHosts.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).toArray();
-    int[] numHours = 
Arrays.stream(_numHours.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).toArray();
+    int[] numHosts = 
Arrays.stream(_numHosts.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).sorted().toArray();
+    int[] numHours = 
Arrays.stream(_numHours.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).sorted().toArray();
 
     int totalConsumingPartitions = _numPartitions * numReplicas;
 
@@ -177,27 +218,32 @@ public class RealtimeProvisioningHelperCommand extends 
AbstractBaseAdminCommand
     // Completed: Use multiple (completedSize,numHours) data points to 
calculate completed size for our numHours
     File sampleCompletedSegmentFile = new File(_sampleCompletedSegmentDir);
 
-    long sampleSegmentConsumedSeconds =
-        
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(_periodSampleSegmentConsumed),
 TimeUnit.MILLISECONDS);
-
     long maxUsableHostMemBytes = DataSizeUtils.toBytes(_maxUsableHostMemory);
 
     MemoryEstimator memoryEstimator =
-        new MemoryEstimator(tableConfig, sampleCompletedSegmentFile, 
sampleSegmentConsumedSeconds,
-            maxUsableHostMemBytes);
+        new MemoryEstimator(tableConfig, sampleCompletedSegmentFile, 
_ingestionRate, maxUsableHostMemBytes, tableRetentionHours);
     File sampleStatsHistory = memoryEstimator.initializeStatsHistory();
     memoryEstimator
         .estimateMemoryUsed(sampleStatsHistory, numHosts, numHours, 
totalConsumingPartitions, _retentionHours);
 
+    note.append("\n* See 
https://docs.pinot.apache.org/operators/operating-pinot/tuning/realtime";);
     // TODO: Make a recommendation of what config to choose by considering 
more inputs such as qps
-    LOGGER.info("\nMemory used per host");
-    displayResults(memoryEstimator.getTotalMemoryPerHost(), numHosts, 
numHours);
+    displayOutputHeader(note);
+    LOGGER.info("\nMemory used per host (Active/Mapped)");
+    displayResults(memoryEstimator.getActiveMemoryPerHost(), numHosts, 
numHours);
     LOGGER.info("\nOptimal segment size");
     displayResults(memoryEstimator.getOptimalSegmentSize(), numHosts, 
numHours);
     LOGGER.info("\nConsuming memory");
     displayResults(memoryEstimator.getConsumingMemoryPerHost(), numHosts, 
numHours);
+    LOGGER.info("\nNumber of segments queried per host");
+    displayResults(memoryEstimator.getNumSegmentsQueriedPerHost(), numHosts, 
numHours);
     return true;
   }
+  
+  private void displayOutputHeader(StringBuilder note) {
+    
System.out.println("\n============================================================\n"
 + toString());
+    System.out.println(note.toString());
+  }
 
   /**
    * Displays the output values as a grid of numHoursToConsume vs 
numHostsToProvision
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index cbf6b61..1117b36 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -53,7 +53,9 @@ public class MemoryEstimator {
   private final String _tableNameWithType;
   private final File _sampleCompletedSegment;
   private final long _sampleSegmentConsumedSeconds;
+  private final int _totalDocsInSampleSegment;
   private final long _maxUsableHostMemory;
+  private final int _tableRetentionHours;
 
   private SegmentMetadataImpl _segmentMetadata;
   private long _sampleCompletedSegmentSizeBytes;
@@ -63,17 +65,18 @@ public class MemoryEstimator {
   int _avgMultiValues;
   private File _tableDataDir;
 
-  private String[][] _totalMemoryPerHost;
+  private String[][] _activeMemoryPerHost;
   private String[][] _optimalSegmentSize;
   private String[][] _consumingMemoryPerHost;
+  private String[][] _numSegmentsQueriedPerHost;
 
-  public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, 
long sampleSegmentConsumedSeconds,
-      long maxUsableHostMemory) {
+  public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, 
int ingestionRate,
+      long maxUsableHostMemory, int tableRetentionHours) {
     _maxUsableHostMemory = maxUsableHostMemory;
     _tableConfig = tableConfig;
     _tableNameWithType = tableConfig.getTableName();
     _sampleCompletedSegment = sampleCompletedSegment;
-    _sampleSegmentConsumedSeconds = sampleSegmentConsumedSeconds;
+    _tableRetentionHours = tableRetentionHours;
 
     _sampleCompletedSegmentSizeBytes = 
FileUtils.sizeOfDirectory(_sampleCompletedSegment);
     try {
@@ -81,6 +84,8 @@ public class MemoryEstimator {
     } catch (Exception e) {
       throw new RuntimeException("Caught exception when reading segment index 
dir", e);
     }
+    _totalDocsInSampleSegment = _segmentMetadata.getTotalDocs();
+    _sampleSegmentConsumedSeconds = 
(int)(_totalDocsInSampleSegment/ingestionRate);
 
     if 
(CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns()))
 {
       
_noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns());
@@ -182,88 +187,103 @@ public class MemoryEstimator {
    * @param numHosts list of number of hosts that are to be provisioned
    * @param numHours list of number of hours to be consumed
    * @param totalConsumingPartitions total consuming partitions we are 
provisioning for
-   * @param retentionHours what is the amount of retention in memory expected 
for completed segments
+   * @param retentionHours number of most recent hours to be retained in 
memory for queries.
    * @throws IOException
    */
-  public void estimateMemoryUsed(File statsFile, int[] numHosts, int[] 
numHours, int totalConsumingPartitions,
-      int retentionHours)
+  public void estimateMemoryUsed(File statsFile, int[] numHosts, int[] 
numHours, final int totalConsumingPartitions,
+      final int retentionHours)
       throws IOException {
-    _totalMemoryPerHost = new String[numHours.length][numHosts.length];
+    _activeMemoryPerHost = new String[numHours.length][numHosts.length];
     _optimalSegmentSize = new String[numHours.length][numHosts.length];
     _consumingMemoryPerHost = new String[numHours.length][numHosts.length];
+    _numSegmentsQueriedPerHost = new String[numHours.length][numHosts.length];
+    for (int i = 0; i < numHours.length; i++) {
+      for (int j = 0; j < numHosts.length; j++) {
+        _activeMemoryPerHost[i][j] = NOT_APPLICABLE;
+        _consumingMemoryPerHost[i][j] = NOT_APPLICABLE;
+        _optimalSegmentSize[i][j] = NOT_APPLICABLE;
+        _numSegmentsQueriedPerHost[i][j] = NOT_APPLICABLE;
+      }
+    }
 
     for (int i = 0; i < numHours.length; i++) {
       int numHoursToConsume = numHours[i];
+      if (numHoursToConsume > retentionHours) {
+        continue;
+      }
       long secondsToConsume = numHoursToConsume * 3600;
       // consuming for _numHoursSampleSegmentConsumed, gives size 
sampleCompletedSegmentSizeBytes
       // hence, consuming for numHoursToConsume would give:
       long completedSegmentSizeBytes =
           (long) (((double) secondsToConsume / _sampleSegmentConsumedSeconds) 
* _sampleCompletedSegmentSizeBytes);
 
-      long totalMemoryForCompletedSegmentsPerPartition =
-          
calculateMemoryForCompletedSegmentsPerPartition(completedSegmentSizeBytes, 
numHoursToConsume, retentionHours);
-
-      int totalDocsInSampleSegment = _segmentMetadata.getTotalDocs();
       // numHoursSampleSegmentConsumed created totalDocsInSampleSegment num 
rows
       // numHoursToConsume will create ? rows
-      int totalDocs = (int) (((double) secondsToConsume / 
_sampleSegmentConsumedSeconds) * totalDocsInSampleSegment);
-
-      // We don't want the stats history to get updated from all our dummy runs
-      // So we copy over the original stats history every time we start
-      File statsFileCopy = new File(_tableDataDir, STATS_FILE_COPY_NAME);
-      FileUtils.copyFile(statsFile, statsFileCopy);
-      RealtimeSegmentStatsHistory statsHistory;
-      try {
-        statsHistory = 
RealtimeSegmentStatsHistory.deserialzeFrom(statsFileCopy);
-      } catch (IOException | ClassNotFoundException e) {
-        throw new RuntimeException(
-            "Exception when deserializing stats history from stats file " + 
statsFileCopy.getAbsolutePath(), e);
-      }
-      RealtimeIndexOffHeapMemoryManager memoryManager = new 
DirectMemoryManager(_segmentMetadata.getName());
-      RealtimeSegmentZKMetadata segmentZKMetadata = 
getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs);
-
-      RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-          new 
RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
-              
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs)
-              
.setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
-              
.setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
-              
.setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
-              .setStatsHistory(statsHistory);
-
-      // create mutable segment impl
-      MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
-      long memoryForConsumingSegmentPerPartition = 
memoryManager.getTotalAllocatedBytes();
-      mutableSegmentImpl.destroy();
-      FileUtils.deleteQuietly(statsFileCopy);
+      int totalDocs = (int) (((double) secondsToConsume / 
_sampleSegmentConsumedSeconds) * _totalDocsInSampleSegment);
+      long memoryForConsumingSegmentPerPartition = 
getMemoryForConsumingSegmentPerPartition(statsFile, totalDocs);
 
       memoryForConsumingSegmentPerPartition += 
getMemoryForInvertedIndex(memoryForConsumingSegmentPerPartition);
 
-      for (int j = 0; j < numHosts.length; j++) {
+      int numActiveSegmentsPerPartition = (retentionHours  + numHoursToConsume 
- 1)/numHoursToConsume;
+      long activeMemoryForCompletedSegmentsPerPartition = 
completedSegmentSizeBytes * (numActiveSegmentsPerPartition - 1);
+      int numCompletedSegmentsPerPartition = (_tableRetentionHours + 
numHoursToConsume - 1)/numHoursToConsume - 1;
 
+      for (int j = 0; j < numHosts.length; j++) {
         int numHostsToProvision = numHosts[j];
         // adjustment because we want ceiling of division and not floor, as 
some hosts will have an extra partition due to the remainder of the division
         int totalConsumingPartitionsPerHost =
             (totalConsumingPartitions + numHostsToProvision - 1) / 
numHostsToProvision;
 
-        long totalMemoryForCompletedSegmentsPerHost =
-            totalMemoryForCompletedSegmentsPerPartition * 
totalConsumingPartitionsPerHost;
+        long activeMemoryForCompletedSegmentsPerHost =
+            activeMemoryForCompletedSegmentsPerPartition * 
totalConsumingPartitionsPerHost;
         long totalMemoryForConsumingSegmentsPerHost =
             memoryForConsumingSegmentPerPartition * 
totalConsumingPartitionsPerHost;
-        long totalMemoryPerHostBytes = totalMemoryForCompletedSegmentsPerHost 
+ totalMemoryForConsumingSegmentsPerHost;
-
-        if (totalMemoryPerHostBytes > _maxUsableHostMemory) {
-          _totalMemoryPerHost[i][j] = NOT_APPLICABLE;
-          _consumingMemoryPerHost[i][j] = NOT_APPLICABLE;
-          _optimalSegmentSize[i][j] = NOT_APPLICABLE;
-        } else {
-          _totalMemoryPerHost[i][j] = 
DataSizeUtils.fromBytes(totalMemoryPerHostBytes);
+        long activeMemoryPerHostBytes = 
activeMemoryForCompletedSegmentsPerHost + 
totalMemoryForConsumingSegmentsPerHost;
+        long mappedMemoryPerHost = totalMemoryForConsumingSegmentsPerHost +
+            (numCompletedSegmentsPerPartition * 
totalConsumingPartitionsPerHost * completedSegmentSizeBytes);
+
+        if (activeMemoryPerHostBytes <= _maxUsableHostMemory) {
+          _activeMemoryPerHost[i][j] = 
DataSizeUtils.fromBytes(activeMemoryPerHostBytes)
+              + "/" + DataSizeUtils.fromBytes(mappedMemoryPerHost);
           _consumingMemoryPerHost[i][j] = 
DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost);
           _optimalSegmentSize[i][j] = 
DataSizeUtils.fromBytes(completedSegmentSizeBytes);
+          _numSegmentsQueriedPerHost[i][j] = 
String.valueOf(numActiveSegmentsPerPartition);
         }
       }
     }
   }
 
+  private long getMemoryForConsumingSegmentPerPartition(File statsFile, int 
totalDocs) throws IOException {
+    // We don't want the stats history to get updated from all our dummy runs
+    // So we copy over the original stats history every time we start
+    File statsFileCopy = new File(_tableDataDir, STATS_FILE_COPY_NAME);
+    FileUtils.copyFile(statsFile, statsFileCopy);
+    RealtimeSegmentStatsHistory statsHistory;
+    try {
+      statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFileCopy);
+    } catch (IOException | ClassNotFoundException e) {
+      throw new RuntimeException(
+          "Exception when deserializing stats history from stats file " + 
statsFileCopy.getAbsolutePath(), e);
+    }
+    RealtimeIndexOffHeapMemoryManager memoryManager = new 
DirectMemoryManager(_segmentMetadata.getName());
+    RealtimeSegmentZKMetadata segmentZKMetadata = 
getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs);
+
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new 
RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
+            
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs)
+            
.setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
+            
.setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
+            
.setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
+            .setStatsHistory(statsHistory);
+
+    // create mutable segment impl
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    long memoryForConsumingSegmentPerPartition = 
memoryManager.getTotalAllocatedBytes();
+    mutableSegmentImpl.destroy();
+    FileUtils.deleteQuietly(statsFileCopy);
+    return memoryForConsumingSegmentPerPartition;
+  }
+
   /**
    * Gets the average num multivalues across all multi value columns in the 
data
    * @return
@@ -342,20 +362,12 @@ public class MemoryEstimator {
   private long calculateMemoryForCompletedSegmentsPerPartition(long 
completedSegmentSizeBytes, int numHoursToConsume,
       int retentionHours) {
 
-    // if retention is set to x hours, of which we would be consuming for 
numHoursToConsume hours,
-    // the actual number of hours we need to keep completed segment in memory 
is (x-numHoursToConsume)
-    int numHoursToKeepSegmentInMemory = retentionHours - numHoursToConsume;
-
-    // in numHoursToKeepSegmentInMemory hours, a new segment will be created 
every numHoursToConsume hours
-    // hence we need to account for multiple segments which are completed and 
in memory at the same time
-    int numCompletedSegmentsToKeepInMemory =
-        (numHoursToKeepSegmentInMemory + numHoursToConsume - 1) / 
numHoursToConsume; // adjustment for ceil
-
-    return numCompletedSegmentsToKeepInMemory * completedSegmentSizeBytes;
+    int numSegmentsInMemory = (retentionHours + numHoursToConsume - 
1)/numHoursToConsume;
+    return completedSegmentSizeBytes * (numSegmentsInMemory - 1);
   }
 
-  public String[][] getTotalMemoryPerHost() {
-    return _totalMemoryPerHost;
+  public String[][] getActiveMemoryPerHost() {
+    return _activeMemoryPerHost;
   }
 
   public String[][] getOptimalSegmentSize() {
@@ -365,4 +377,8 @@ public class MemoryEstimator {
   public String[][] getConsumingMemoryPerHost() {
     return _consumingMemoryPerHost;
   }
+
+  public String[][] getNumSegmentsQueriedPerHost() {
+    return _numSegmentsQueriedPerHost;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to