This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 540853d Improvements to RealtimeProvisioningHelper command (#5737) 540853d is described below commit 540853d786c9e4b4bb27bf4bbc6fa73837ff1bc3 Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Fri Jul 24 20:18:37 2020 -0700 Improvements to RealtimeProvisioningHelper command (#5737) * Improvements to RealtimepRovisioningHelper command - Fixed a bug where we were looking for push frequency setting in the table config Push frequency is not usually set in realtime table, so changed that to be an optional argument instead. - Instead of push frequency, the user may specify retentionHours, meaning the number of hours of most recent data that will be queried most often. - Added the amount of total mapped memory to the output - Added the number of segments queried per host to the output. - Added a link to the documentation to the output. - Instead of accepting the time taken to consume the segment, introduced an argument to accept the ingestion rate per partition. This is determined easier when looking at metrics for the topic that we need to ingest, and is more convenient when an offline segment is being provided as a sample - Added the command line arguments printed with the output so that it is easy for us to debug when we get questions from the community. TODO Next: Can we compute a score and just recommend a configuration? We know we want to minimize num segments scanned per host, num hosts, segment size (to fit memory). Issue #5588 Here is a sample output: ============================================================ RealtimeProvisioningHelperCommand -tableConfigFile /Users/ssubrama/tmp/samza/realtimeTableConfig.json -numPartitions 16 -pushFrequency null -numHosts 8,6,10 -numHours 6,12,18,24 -sampleCompletedSegmentDir /Users/ssubrama/tmp/samza/TestSamzaAnalyticsFeatures_1593411480000_1593500340000_0/ -ingestionRate 100 -maxUsableHostMemory 48G -retentionHours 72 Note: * Table retention and push frequency ignored for determining retentionHours * See https://docs.pinot.apache.org/operators/operating-pinot/tuning/realtime Memory used per host (Active/Mapped) numHosts --> 6 |8 |10 | numHours 6 --------> 5.05G/19.49G |3.37G/12.99G |3.37G/12.99G | 12 --------> 5.89G/20.33G |3.93G/13.55G |3.93G/13.55G | 18 --------> 6.73G/21.49G |4.48G/14.33G |4.48G/14.33G | 24 --------> 7.56G/22G |5.04G/14.66G |5.04G/14.66G | Optimal segment size numHosts --> 6 |8 |10 | numHours 6 --------> 111.98M |111.98M |111.98M | 12 --------> 223.96M |223.96M |223.96M | 18 --------> 335.94M |335.94M |335.94M | 24 --------> 447.92M |447.92M |447.92M | Consuming memory numHosts --> 6 |8 |10 | numHours 6 --------> 1.45G |987.17M |987.17M | 12 --------> 2.61G |1.74G |1.74G | 18 --------> 3.77G |2.52G |2.52G | 24 --------> 4.94G |3.29G |3.29G | Number of segments queried per host numHosts --> 6 |8 |10 | numHours 6 --------> 12 |12 |12 | 12 --------> 6 |6 |6 | 18 --------> 4 |4 |4 | 24 --------> 3 |3 |3 | * Addressed review comments --- .../routing/timeboundary/TimeBoundaryManager.java | 2 +- .../apache/pinot/common/utils/CommonConstants.java | 7 + .../controller/util/SegmentIntervalUtils.java | 9 +- .../name/NormalizedDateSegmentNameGenerator.java | 3 +- .../command/RealtimeProvisioningHelperCommand.java | 100 ++++++++++---- .../realtime/provisioning/MemoryEstimator.java | 144 ++++++++++++--------- 6 files changed, 168 insertions(+), 97 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java index 5e4b019..7462913 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java @@ -82,7 +82,7 @@ public class TimeBoundaryManager { // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use // (maxEndTime - 1 DAY) - _isHourlyTable = "HOURLY".equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) + _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) && _timeUnit != TimeUnit.DAYS; LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeUnit: {}, isHourlyTable: {} for table: {}", diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 3e25512..85113e7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -27,6 +27,13 @@ public class CommonConstants { public static final String HTTP_PROTOCOL = "http"; public static final String HTTPS_PROTOCOL = "https"; + public static class Table { + public static final String PUSH_FREQUENCY_HOURLY = "hourly"; + public static final String PUSH_FREQUENCY_DAILY = "daily"; + public static final String PUSH_FREQUENCY_WEEKLY = "weekly"; + public static final String PUSH_FREQUENCY_MONTHLY = "monthly"; + } + public static class Helix { public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress"; public static final String QUERIES_DISABLED = "queriesDisabled"; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java index e08246e..cdd3291 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.util; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.joda.time.Duration; @@ -49,16 +50,16 @@ public class SegmentIntervalUtils { * Converts push frequency into duration. For invalid or less than 'hourly' push frequency, treats it as 'daily'. */ public static Duration convertToDuration(String pushFrequency) { - if ("hourly".equalsIgnoreCase(pushFrequency)) { + if (CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(pushFrequency)) { return Duration.standardHours(1L); } - if ("daily".equalsIgnoreCase(pushFrequency)) { + if (CommonConstants.Table.PUSH_FREQUENCY_DAILY.equalsIgnoreCase(pushFrequency)) { return Duration.standardDays(1L); } - if ("weekly".equalsIgnoreCase(pushFrequency)) { + if (CommonConstants.Table.PUSH_FREQUENCY_WEEKLY.equalsIgnoreCase(pushFrequency)) { return Duration.standardDays(7L); } - if ("monthly".equalsIgnoreCase(pushFrequency)) { + if (CommonConstants.Table.PUSH_FREQUENCY_MONTHLY.equalsIgnoreCase(pushFrequency)) { return Duration.standardDays(30L); } return Duration.standardDays(1L); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java index 75b619d..033e5c5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java @@ -25,6 +25,7 @@ import java.util.Date; import java.util.TimeZone; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.spi.data.DateTimeFieldSpec.TimeFormat; import org.apache.pinot.spi.data.DateTimeFormatSpec; @@ -54,7 +55,7 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator // Include time info for APPEND push type if (_appendPushType) { // For HOURLY push frequency, include hours into output format - if ("HOURLY".equalsIgnoreCase(pushFrequency)) { + if (CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(pushFrequency)) { _outputSDF = new SimpleDateFormat("yyyy-MM-dd-HH"); } else { _outputSDF = new SimpleDateFormat("yyyy-MM-dd"); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java index 1976443..7c145b1 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RealtimeProvisioningHelperCommand.java @@ -28,7 +28,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.tools.Command; import org.apache.pinot.tools.realtime.provisioning.MemoryEstimator; import org.kohsuke.args4j.Option; @@ -44,10 +43,12 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeProvisioningHelperCommand.class); - private static final int MEMORY_STR_LEN = 9; + private static final int MEMORY_STR_LEN = 16; private static final String COMMA_SEPARATOR = ","; private static final int DEFAULT_RETENTION_FOR_HOURLY_PUSH = 24; private static final int DEFAULT_RETENTION_FOR_DAILY_PUSH = 72; + private static final int DEFAULT_RETENTION_FOR_WEEKLY_PUSH = 24*7 + 72; + private static final int DEFAULT_RETENTION_FOR_MONTHLY_PUSH = 24*31 + 72; @Option(name = "-tableConfigFile", required = true, metaVar = "<String>") private String _tableConfigFile; @@ -55,13 +56,15 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand @Option(name = "-numPartitions", required = true, metaVar = "<int>", usage = "number of stream partitions for the table") private int _numPartitions; - @Option(name = "-retentionHours", metaVar = "<int>", usage = - "Number of hours the segments will need to be retained in memory. " - + "\nThe realtime segments will need to be in memory only until the offline segments are available and used for queries" - + "\nThis will be picked from the table config by looking at the segmentPushFrequency (72h if daily, 24h if hourly, buffer added as TimeBoundaryService doesn't query the last offline timestamp), " - + "\nIt can be overridden using this option") + @Option(name = "-retentionHours", metaVar = "<int>", usage = "Number of recent hours queried most often" + + "\n\t(-pushFrequency is ignored)") private int _retentionHours; + @Option(name = "-pushFrequency", metaVar = "<String>", usage = + "Frequency with which offline table pushes happen, if this is a hybrid table" + + "\n\t(hourly,daily,weekly,monthly). Do not specify if realtime-only table") + private String _pushFrequency; + @Option(name = "-numHosts", metaVar = "<String>", usage = "number of hosts as comma separated values (default 2,4,6,8,10,12,14,16)") private String _numHosts = "2,4,6,8,10,12,14,16"; @@ -71,8 +74,8 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand @Option(name = "-sampleCompletedSegmentDir", required = true, metaVar = "<String>", usage = "Consume from the topic for n hours and provide the path of the segment dir after it completes") private String _sampleCompletedSegmentDir; - @Option(name = "-periodSampleSegmentConsumed", required = true, metaVar = "<String>", usage = "Period for which the sample segment was consuming in format 4h, 5h30m, 40m etc") - private String _periodSampleSegmentConsumed; + @Option(name = "-ingestionRate", required = true, metaVar = "<String>", usage = "Avg number of messages per second ingested on any one stream partition (assumed all partitions are uniform)") + private int _ingestionRate; @Option(name = "-maxUsableHostMemory", required = false, metaVar = "<String>", usage = "Maximum memory per host that can be used for pinot data (e.g. 250G, 100M). Default 48g") private String _maxUsableHostMemory = "48G"; @@ -90,6 +93,11 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand return this; } + public RealtimeProvisioningHelperCommand setPushFrequency(String pushFrequency) { + _pushFrequency = pushFrequency; + return this; + } + public RealtimeProvisioningHelperCommand setRetentionHours(int retentionHours) { _retentionHours = retentionHours; return this; @@ -115,17 +123,17 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand return this; } - public RealtimeProvisioningHelperCommand setPeriodSampleSegmentConsumed(String periodSampleSegmentConsumed) { - _periodSampleSegmentConsumed = periodSampleSegmentConsumed; + public RealtimeProvisioningHelperCommand setIngestionRate(int ingestionRate) { + _ingestionRate = ingestionRate; return this; } @Override public String toString() { return ("RealtimeProvisioningHelperCommand -tableConfigFile " + _tableConfigFile + " -numPartitions " - + _numPartitions + " -retentionHours " + _retentionHours + " -numHosts " + _numHosts + " -numHours " + _numHours - + " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir + " -periodSampleSegmentConsumed " - + _periodSampleSegmentConsumed + "-maxUsableMemory " + _maxUsableHostMemory); + + _numPartitions + " -pushFrequency " + _pushFrequency + " -numHosts " + _numHosts + " -numHours " + _numHours + + " -sampleCompletedSegmentDir " + _sampleCompletedSegmentDir + " -ingestionRate " + + _ingestionRate + " -maxUsableHostMemory " + _maxUsableHostMemory + " -retentionHours " + _retentionHours); } @Override @@ -146,6 +154,20 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand } @Override + public void printExamples() { + StringBuilder builder = new StringBuilder(); + builder.append("\n\nThis command allows you to estimate the capacity needed for provisioning realtime hosts") + .append("It assumes that there is no upper limit to the amount of memory you can mmap") + .append("\nIf you have a hybrid table, then consult the push frequency setting in your offline table specify it in the -pushFrequency argument") + .append("\nIf you have a realtime-only table, then the default behavior is to assume that your queries need all data in memory all the time") + .append("\nHowever, if most of your queries are going to be for (say) the last 96 hours, then you can specify that in -retentionHours") + .append("\nDoing so will let this program assume that you are willing to take a page hit when querying older data") + .append("\nand optimize memory and number of hosts accordingly.") + ; + System.out.println(builder.toString()); + } + + @Override public boolean execute() throws IOException { LOGGER.info("Executing command: {}", toString()); @@ -158,17 +180,36 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand throw new RuntimeException("Exception in reading table config from file " + _tableConfigFile, e); } + StringBuilder note = new StringBuilder(); + note.append("\nNote:\n"); int numReplicas = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); - if (_retentionHours == 0) { - if (tableConfig.getValidationConfig().getSegmentPushFrequency().equalsIgnoreCase("hourly")) { - _retentionHours = DEFAULT_RETENTION_FOR_HOURLY_PUSH; + int tableRetentionHours = (int) TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit()) + .toHours(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue())); + if (_retentionHours > 0) { + note.append("\n* Table retention and push frequency ignored for determining retentionHours since it is specified in command"); + } else { + if (_pushFrequency == null) { + // This is a realtime-only table. Pick up the retention time + _retentionHours = tableRetentionHours; + note.append("\n* Retention hours taken from tableConfig"); } else { - _retentionHours = DEFAULT_RETENTION_FOR_DAILY_PUSH; + if ("hourly".equalsIgnoreCase(_pushFrequency)) { + _retentionHours = DEFAULT_RETENTION_FOR_HOURLY_PUSH; + } else if ("daily".equalsIgnoreCase(_pushFrequency)) { + _retentionHours = DEFAULT_RETENTION_FOR_DAILY_PUSH; + } else if ("weekly".equalsIgnoreCase(_pushFrequency)) { + _retentionHours = DEFAULT_RETENTION_FOR_WEEKLY_PUSH; + } else if ("monthly".equalsIgnoreCase(_pushFrequency)) { + _retentionHours = DEFAULT_RETENTION_FOR_MONTHLY_PUSH; + } else { + throw new IllegalArgumentException("Illegal value for pushFrequency: '" + _pushFrequency + "'"); + } + note.append("\n* Retention hours taken from pushFrequency"); } } - int[] numHosts = Arrays.stream(_numHosts.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).toArray(); - int[] numHours = Arrays.stream(_numHours.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).toArray(); + int[] numHosts = Arrays.stream(_numHosts.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).sorted().toArray(); + int[] numHours = Arrays.stream(_numHours.split(COMMA_SEPARATOR)).mapToInt(Integer::parseInt).sorted().toArray(); int totalConsumingPartitions = _numPartitions * numReplicas; @@ -177,27 +218,32 @@ public class RealtimeProvisioningHelperCommand extends AbstractBaseAdminCommand // Completed: Use multiple (completedSize,numHours) data points to calculate completed size for our numHours File sampleCompletedSegmentFile = new File(_sampleCompletedSegmentDir); - long sampleSegmentConsumedSeconds = - TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(_periodSampleSegmentConsumed), TimeUnit.MILLISECONDS); - long maxUsableHostMemBytes = DataSizeUtils.toBytes(_maxUsableHostMemory); MemoryEstimator memoryEstimator = - new MemoryEstimator(tableConfig, sampleCompletedSegmentFile, sampleSegmentConsumedSeconds, - maxUsableHostMemBytes); + new MemoryEstimator(tableConfig, sampleCompletedSegmentFile, _ingestionRate, maxUsableHostMemBytes, tableRetentionHours); File sampleStatsHistory = memoryEstimator.initializeStatsHistory(); memoryEstimator .estimateMemoryUsed(sampleStatsHistory, numHosts, numHours, totalConsumingPartitions, _retentionHours); + note.append("\n* See https://docs.pinot.apache.org/operators/operating-pinot/tuning/realtime"); // TODO: Make a recommendation of what config to choose by considering more inputs such as qps - LOGGER.info("\nMemory used per host"); - displayResults(memoryEstimator.getTotalMemoryPerHost(), numHosts, numHours); + displayOutputHeader(note); + LOGGER.info("\nMemory used per host (Active/Mapped)"); + displayResults(memoryEstimator.getActiveMemoryPerHost(), numHosts, numHours); LOGGER.info("\nOptimal segment size"); displayResults(memoryEstimator.getOptimalSegmentSize(), numHosts, numHours); LOGGER.info("\nConsuming memory"); displayResults(memoryEstimator.getConsumingMemoryPerHost(), numHosts, numHours); + LOGGER.info("\nNumber of segments queried per host"); + displayResults(memoryEstimator.getNumSegmentsQueriedPerHost(), numHosts, numHours); return true; } + + private void displayOutputHeader(StringBuilder note) { + System.out.println("\n============================================================\n" + toString()); + System.out.println(note.toString()); + } /** * Displays the output values as a grid of numHoursToConsume vs numHostsToProvision diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java index cbf6b61..1117b36 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java @@ -53,7 +53,9 @@ public class MemoryEstimator { private final String _tableNameWithType; private final File _sampleCompletedSegment; private final long _sampleSegmentConsumedSeconds; + private final int _totalDocsInSampleSegment; private final long _maxUsableHostMemory; + private final int _tableRetentionHours; private SegmentMetadataImpl _segmentMetadata; private long _sampleCompletedSegmentSizeBytes; @@ -63,17 +65,18 @@ public class MemoryEstimator { int _avgMultiValues; private File _tableDataDir; - private String[][] _totalMemoryPerHost; + private String[][] _activeMemoryPerHost; private String[][] _optimalSegmentSize; private String[][] _consumingMemoryPerHost; + private String[][] _numSegmentsQueriedPerHost; - public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, long sampleSegmentConsumedSeconds, - long maxUsableHostMemory) { + public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, int ingestionRate, + long maxUsableHostMemory, int tableRetentionHours) { _maxUsableHostMemory = maxUsableHostMemory; _tableConfig = tableConfig; _tableNameWithType = tableConfig.getTableName(); _sampleCompletedSegment = sampleCompletedSegment; - _sampleSegmentConsumedSeconds = sampleSegmentConsumedSeconds; + _tableRetentionHours = tableRetentionHours; _sampleCompletedSegmentSizeBytes = FileUtils.sizeOfDirectory(_sampleCompletedSegment); try { @@ -81,6 +84,8 @@ public class MemoryEstimator { } catch (Exception e) { throw new RuntimeException("Caught exception when reading segment index dir", e); } + _totalDocsInSampleSegment = _segmentMetadata.getTotalDocs(); + _sampleSegmentConsumedSeconds = (int)(_totalDocsInSampleSegment/ingestionRate); if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns())) { _noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns()); @@ -182,88 +187,103 @@ public class MemoryEstimator { * @param numHosts list of number of hosts that are to be provisioned * @param numHours list of number of hours to be consumed * @param totalConsumingPartitions total consuming partitions we are provisioning for - * @param retentionHours what is the amount of retention in memory expected for completed segments + * @param retentionHours number of most recent hours to be retained in memory for queries. * @throws IOException */ - public void estimateMemoryUsed(File statsFile, int[] numHosts, int[] numHours, int totalConsumingPartitions, - int retentionHours) + public void estimateMemoryUsed(File statsFile, int[] numHosts, int[] numHours, final int totalConsumingPartitions, + final int retentionHours) throws IOException { - _totalMemoryPerHost = new String[numHours.length][numHosts.length]; + _activeMemoryPerHost = new String[numHours.length][numHosts.length]; _optimalSegmentSize = new String[numHours.length][numHosts.length]; _consumingMemoryPerHost = new String[numHours.length][numHosts.length]; + _numSegmentsQueriedPerHost = new String[numHours.length][numHosts.length]; + for (int i = 0; i < numHours.length; i++) { + for (int j = 0; j < numHosts.length; j++) { + _activeMemoryPerHost[i][j] = NOT_APPLICABLE; + _consumingMemoryPerHost[i][j] = NOT_APPLICABLE; + _optimalSegmentSize[i][j] = NOT_APPLICABLE; + _numSegmentsQueriedPerHost[i][j] = NOT_APPLICABLE; + } + } for (int i = 0; i < numHours.length; i++) { int numHoursToConsume = numHours[i]; + if (numHoursToConsume > retentionHours) { + continue; + } long secondsToConsume = numHoursToConsume * 3600; // consuming for _numHoursSampleSegmentConsumed, gives size sampleCompletedSegmentSizeBytes // hence, consuming for numHoursToConsume would give: long completedSegmentSizeBytes = (long) (((double) secondsToConsume / _sampleSegmentConsumedSeconds) * _sampleCompletedSegmentSizeBytes); - long totalMemoryForCompletedSegmentsPerPartition = - calculateMemoryForCompletedSegmentsPerPartition(completedSegmentSizeBytes, numHoursToConsume, retentionHours); - - int totalDocsInSampleSegment = _segmentMetadata.getTotalDocs(); // numHoursSampleSegmentConsumed created totalDocsInSampleSegment num rows // numHoursToConsume will create ? rows - int totalDocs = (int) (((double) secondsToConsume / _sampleSegmentConsumedSeconds) * totalDocsInSampleSegment); - - // We don't want the stats history to get updated from all our dummy runs - // So we copy over the original stats history every time we start - File statsFileCopy = new File(_tableDataDir, STATS_FILE_COPY_NAME); - FileUtils.copyFile(statsFile, statsFileCopy); - RealtimeSegmentStatsHistory statsHistory; - try { - statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFileCopy); - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException( - "Exception when deserializing stats history from stats file " + statsFileCopy.getAbsolutePath(), e); - } - RealtimeIndexOffHeapMemoryManager memoryManager = new DirectMemoryManager(_segmentMetadata.getName()); - RealtimeSegmentZKMetadata segmentZKMetadata = getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs); - - RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = - new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName()) - .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs) - .setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns) - .setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns) - .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager) - .setStatsHistory(statsHistory); - - // create mutable segment impl - MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build()); - long memoryForConsumingSegmentPerPartition = memoryManager.getTotalAllocatedBytes(); - mutableSegmentImpl.destroy(); - FileUtils.deleteQuietly(statsFileCopy); + int totalDocs = (int) (((double) secondsToConsume / _sampleSegmentConsumedSeconds) * _totalDocsInSampleSegment); + long memoryForConsumingSegmentPerPartition = getMemoryForConsumingSegmentPerPartition(statsFile, totalDocs); memoryForConsumingSegmentPerPartition += getMemoryForInvertedIndex(memoryForConsumingSegmentPerPartition); - for (int j = 0; j < numHosts.length; j++) { + int numActiveSegmentsPerPartition = (retentionHours + numHoursToConsume - 1)/numHoursToConsume; + long activeMemoryForCompletedSegmentsPerPartition = completedSegmentSizeBytes * (numActiveSegmentsPerPartition - 1); + int numCompletedSegmentsPerPartition = (_tableRetentionHours + numHoursToConsume - 1)/numHoursToConsume - 1; + for (int j = 0; j < numHosts.length; j++) { int numHostsToProvision = numHosts[j]; // adjustment because we want ceiling of division and not floor, as some hosts will have an extra partition due to the remainder of the division int totalConsumingPartitionsPerHost = (totalConsumingPartitions + numHostsToProvision - 1) / numHostsToProvision; - long totalMemoryForCompletedSegmentsPerHost = - totalMemoryForCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost; + long activeMemoryForCompletedSegmentsPerHost = + activeMemoryForCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost; long totalMemoryForConsumingSegmentsPerHost = memoryForConsumingSegmentPerPartition * totalConsumingPartitionsPerHost; - long totalMemoryPerHostBytes = totalMemoryForCompletedSegmentsPerHost + totalMemoryForConsumingSegmentsPerHost; - - if (totalMemoryPerHostBytes > _maxUsableHostMemory) { - _totalMemoryPerHost[i][j] = NOT_APPLICABLE; - _consumingMemoryPerHost[i][j] = NOT_APPLICABLE; - _optimalSegmentSize[i][j] = NOT_APPLICABLE; - } else { - _totalMemoryPerHost[i][j] = DataSizeUtils.fromBytes(totalMemoryPerHostBytes); + long activeMemoryPerHostBytes = activeMemoryForCompletedSegmentsPerHost + totalMemoryForConsumingSegmentsPerHost; + long mappedMemoryPerHost = totalMemoryForConsumingSegmentsPerHost + + (numCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost * completedSegmentSizeBytes); + + if (activeMemoryPerHostBytes <= _maxUsableHostMemory) { + _activeMemoryPerHost[i][j] = DataSizeUtils.fromBytes(activeMemoryPerHostBytes) + + "/" + DataSizeUtils.fromBytes(mappedMemoryPerHost); _consumingMemoryPerHost[i][j] = DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost); _optimalSegmentSize[i][j] = DataSizeUtils.fromBytes(completedSegmentSizeBytes); + _numSegmentsQueriedPerHost[i][j] = String.valueOf(numActiveSegmentsPerPartition); } } } } + private long getMemoryForConsumingSegmentPerPartition(File statsFile, int totalDocs) throws IOException { + // We don't want the stats history to get updated from all our dummy runs + // So we copy over the original stats history every time we start + File statsFileCopy = new File(_tableDataDir, STATS_FILE_COPY_NAME); + FileUtils.copyFile(statsFile, statsFileCopy); + RealtimeSegmentStatsHistory statsHistory; + try { + statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFileCopy); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException( + "Exception when deserializing stats history from stats file " + statsFileCopy.getAbsolutePath(), e); + } + RealtimeIndexOffHeapMemoryManager memoryManager = new DirectMemoryManager(_segmentMetadata.getName()); + RealtimeSegmentZKMetadata segmentZKMetadata = getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs); + + RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = + new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName()) + .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs) + .setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns) + .setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns) + .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager) + .setStatsHistory(statsHistory); + + // create mutable segment impl + MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build()); + long memoryForConsumingSegmentPerPartition = memoryManager.getTotalAllocatedBytes(); + mutableSegmentImpl.destroy(); + FileUtils.deleteQuietly(statsFileCopy); + return memoryForConsumingSegmentPerPartition; + } + /** * Gets the average num multivalues across all multi value columns in the data * @return @@ -342,20 +362,12 @@ public class MemoryEstimator { private long calculateMemoryForCompletedSegmentsPerPartition(long completedSegmentSizeBytes, int numHoursToConsume, int retentionHours) { - // if retention is set to x hours, of which we would be consuming for numHoursToConsume hours, - // the actual number of hours we need to keep completed segment in memory is (x-numHoursToConsume) - int numHoursToKeepSegmentInMemory = retentionHours - numHoursToConsume; - - // in numHoursToKeepSegmentInMemory hours, a new segment will be created every numHoursToConsume hours - // hence we need to account for multiple segments which are completed and in memory at the same time - int numCompletedSegmentsToKeepInMemory = - (numHoursToKeepSegmentInMemory + numHoursToConsume - 1) / numHoursToConsume; // adjustment for ceil - - return numCompletedSegmentsToKeepInMemory * completedSegmentSizeBytes; + int numSegmentsInMemory = (retentionHours + numHoursToConsume - 1)/numHoursToConsume; + return completedSegmentSizeBytes * (numSegmentsInMemory - 1); } - public String[][] getTotalMemoryPerHost() { - return _totalMemoryPerHost; + public String[][] getActiveMemoryPerHost() { + return _activeMemoryPerHost; } public String[][] getOptimalSegmentSize() { @@ -365,4 +377,8 @@ public class MemoryEstimator { public String[][] getConsumingMemoryPerHost() { return _consumingMemoryPerHost; } + + public String[][] getNumSegmentsQueriedPerHost() { + return _numSegmentsQueriedPerHost; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org