This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3892812 Add "num rows in segments" and "num segments queried per host" to the output of Realtime Provisioning Rule (#7282) 3892812 is described below commit 38928124496237402b2260c91fa170b2f3fa0779 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Wed Aug 25 18:41:16 2021 -0700 Add "num rows in segments" and "num segments queried per host" to the output of Realtime Provisioning Rule (#7282) * Fix issue with output of Realtime Prov. Rule * Fix rebase conflict * Resolve conflict on git rebase * Add num rows for each segment size recommendation * Add num segments queried per host to the output * Fix "division by zero" issue with low ingestion rate use cases * Make long line shorter --- .../realtime/provisioning/MemoryEstimator.java | 14 +++- .../rules/impl/RealtimeProvisioningRule.java | 31 ++++++++- .../rules/impl/VariedLengthDictionaryRule.java | 2 +- .../recommender/rules/io/configs/IndexConfig.java | 8 +-- .../controller/recommender/TestConfigEngine.java | 15 +++-- ...ealtimeProvisioningInput_highIngestionRate.json | 75 ++++++++++++++++++++++ 6 files changed, 130 insertions(+), 15 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java index e0cc02d..fd915e1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java @@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory; */ public class MemoryEstimator { - private static final String NOT_APPLICABLE = "NA"; + public static final String NOT_APPLICABLE = "NA"; private static final String STATS_FILE_NAME = "stats.ser"; private static final String STATS_FILE_COPY_NAME = "stats.copy.ser"; @@ -95,13 +95,14 @@ public class MemoryEstimator { private String[][] _activeMemoryPerHost; private String[][] _optimalSegmentSize; + private String[][] _numRowsInSegment; private String[][] _consumingMemoryPerHost; private String[][] _numSegmentsQueriedPerHost; /** * Constructor used for processing the given completed segment */ - public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, int ingestionRatePerPartition, + public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, double ingestionRatePerPartition, long maxUsableHostMemory, int tableRetentionHours, File workingDir) { _maxUsableHostMemory = maxUsableHostMemory; _tableConfig = tableConfig; @@ -135,7 +136,7 @@ public class MemoryEstimator { * Constructor used for processing the given data characteristics (instead of completed segment) */ public MemoryEstimator(TableConfig tableConfig, Schema schema, SchemaWithMetaData schemaWithMetadata, - int numberOfRows, int ingestionRatePerPartition, long maxUsableHostMemory, int tableRetentionHours, + int numberOfRows, double ingestionRatePerPartition, long maxUsableHostMemory, int tableRetentionHours, File workingDir) { this(tableConfig, generateCompletedSegment(schemaWithMetadata, schema, tableConfig, numberOfRows, workingDir), ingestionRatePerPartition, maxUsableHostMemory, tableRetentionHours, workingDir); @@ -240,12 +241,14 @@ public class MemoryEstimator { throws IOException { _activeMemoryPerHost = new String[numHours.length][numHosts.length]; _optimalSegmentSize = new String[numHours.length][numHosts.length]; + _numRowsInSegment = new String[numHours.length][numHosts.length]; _consumingMemoryPerHost = new String[numHours.length][numHosts.length]; _numSegmentsQueriedPerHost = new String[numHours.length][numHosts.length]; for (int i = 0; i < numHours.length; i++) { for (int j = 0; j < numHosts.length; j++) { _activeMemoryPerHost[i][j] = NOT_APPLICABLE; _consumingMemoryPerHost[i][j] = NOT_APPLICABLE; + _numRowsInSegment[i][j] = NOT_APPLICABLE; _optimalSegmentSize[i][j] = NOT_APPLICABLE; _numSegmentsQueriedPerHost[i][j] = NOT_APPLICABLE; } @@ -296,6 +299,7 @@ public class MemoryEstimator { DataSizeUtils.fromBytes(activeMemoryPerHostBytes) + "/" + DataSizeUtils.fromBytes(mappedMemoryPerHost); _consumingMemoryPerHost[i][j] = DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost); _optimalSegmentSize[i][j] = DataSizeUtils.fromBytes(completedSegmentSizeBytes); + _numRowsInSegment[i][j] = String.valueOf(totalDocs); _numSegmentsQueriedPerHost[i][j] = String.valueOf(numActiveSegmentsPerPartition * totalConsumingPartitionsPerHost); } @@ -431,6 +435,10 @@ public class MemoryEstimator { return _optimalSegmentSize; } + public String[][] getNumRowsInSegment() { + return _numRowsInSegment; + } + public String[][] getConsumingMemoryPerHost() { return _consumingMemoryPerHost; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java index 7096186..d505cfb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java @@ -43,6 +43,8 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import static org.apache.pinot.controller.recommender.realtime.provisioning.MemoryEstimator.NOT_APPLICABLE; + /** * This rule gives some recommendations useful for provisioning real time tables. Specifically it provides some @@ -51,6 +53,8 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; */ public class RealtimeProvisioningRule extends AbstractRule { public static final String OPTIMAL_SEGMENT_SIZE = "Optimal Segment Size"; + public static final String NUM_ROWS_IN_SEGMENT = "Number of Rows in Segment"; + public static final String NUM_SEGMENTS_QUERIED_PER_HOST = "Number of Segments Queried per Host"; public static final String CONSUMING_MEMORY_PER_HOST = "Consuming Memory per Host"; public static final String TOTAL_MEMORY_USED_PER_HOST = "Total Memory Used per Host"; @@ -75,7 +79,7 @@ public class RealtimeProvisioningRule extends AbstractRule { createTableConfig(_output.getIndexConfig(), _input.getSchema(), _output.isAggregateMetrics()); long maxUsableHostMemoryByte = DataSizeUtils.toBytes(_params.getMaxUsableHostMemory()); int totalConsumingPartitions = _params.getNumPartitions() * _params.getNumReplicas(); - int ingestionRatePerPartition = (int) _input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions(); + double ingestionRatePerPartition = (double) _input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions(); int retentionHours = _params.getRealtimeTableRetentionHours(); int[] numHosts = _params.getNumHosts(); int[] numHours = _params.getNumHours(); @@ -108,7 +112,7 @@ public class RealtimeProvisioningRule extends AbstractRule { setIfNotEmpty(indexConfig.getNoDictionaryColumns(), tableConfigBuilder::setNoDictionaryColumns); setIfNotEmpty(indexConfig.getInvertedIndexColumns(), tableConfigBuilder::setInvertedIndexColumns); setIfNotEmpty(indexConfig.getOnHeapDictionaryColumns(), tableConfigBuilder::setOnHeapDictionaryColumns); - setIfNotEmpty(indexConfig.getVariedLengthDictionaryColumns(), tableConfigBuilder::setVarLengthDictionaryColumns); + setIfNotEmpty(indexConfig.getVarLengthDictionaryColumns(), tableConfigBuilder::setVarLengthDictionaryColumns); TableConfig tableConfig = tableConfigBuilder.build(); tableConfig.getIndexingConfig().setAggregateMetrics(aggregateMetrics); @@ -131,9 +135,16 @@ public class RealtimeProvisioningRule extends AbstractRule { Map<String, Map<String, String>> rtProvRecommendations) { Map<String, String> segmentSizes = makeMatrix(memoryEstimator.getOptimalSegmentSize(), numHosts, numHours); Map<String, String> consumingMemory = makeMatrix(memoryEstimator.getConsumingMemoryPerHost(), numHosts, numHours); + Map<String, String> numSegmentsQueried = + makeMatrix(memoryEstimator.getNumSegmentsQueriedPerHost(), numHosts, numHours); + Map<String, String> numRowsInSegment = makeMatrix(memoryEstimator.getNumRowsInSegment(), numHosts, numHours, + element -> element.equals(NOT_APPLICABLE) ? element : convertLargeNumberToHumanReadable(element)); Map<String, String> totalMemory = makeMatrix(memoryEstimator.getActiveMemoryPerHost(), numHosts, numHours, - element -> element.substring(0, element.indexOf('/'))); // take the first number (eg: 48G/48G) + element -> element.equals(NOT_APPLICABLE) ? element + : element.substring(0, element.indexOf('/'))); // take the first number (eg: 48G/48G) rtProvRecommendations.put(OPTIMAL_SEGMENT_SIZE, segmentSizes); + rtProvRecommendations.put(NUM_ROWS_IN_SEGMENT, numRowsInSegment); + rtProvRecommendations.put(NUM_SEGMENTS_QUERIED_PER_HOST, numSegmentsQueried); rtProvRecommendations.put(CONSUMING_MEMORY_PER_HOST, consumingMemory); rtProvRecommendations.put(TOTAL_MEMORY_USED_PER_HOST, totalMemory); } @@ -173,4 +184,18 @@ public class RealtimeProvisioningRule extends AbstractRule { return output; } + + private String convertLargeNumberToHumanReadable(String num) { + int val = Integer.parseInt(num); + if (val >= 10_000_000) { + return (val / 1_000_000) + "M"; + } + if (val >= 1_000_000) { + return (val / 100_000) / 10.0 + "M"; // eg: 5,432,000 -> 5.4M + } + if (val >= 10_000) { + return (val / 1000) + "K"; + } + return num; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java index b530601..7573591 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java @@ -44,7 +44,7 @@ public class VariedLengthDictionaryRule extends AbstractRule { LOGGER.debug("{} {}", _input.getFieldType(colName), colName); if (_input.getFieldType(colName) == FieldSpec.DataType.STRING || _input.getFieldType(colName) == FieldSpec.DataType.BYTES) { - _output.getIndexConfig().getVariedLengthDictionaryColumns().add(colName); + _output.getIndexConfig().getVarLengthDictionaryColumns().add(colName); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java index 2f17615..160491a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java @@ -36,13 +36,13 @@ public class IndexConfig { Set<String> _noDictionaryColumns = new HashSet<>(); Set<String> _onHeapDictionaryColumns = new HashSet<>(); - Set<String> _variedLengthDictionaryColumns = new HashSet<>(); + Set<String> _varLengthDictionaryColumns = new HashSet<>(); boolean _isSortedColumnOverwritten = false; @JsonSetter(nulls = Nulls.SKIP) public void setVariedLengthDictionaryColumns(Set<String> variedLengthDictionaryColumns) { - _variedLengthDictionaryColumns = variedLengthDictionaryColumns; + _varLengthDictionaryColumns = variedLengthDictionaryColumns; } @JsonSetter(nulls = Nulls.SKIP) @@ -84,8 +84,8 @@ public class IndexConfig { _isSortedColumnOverwritten = sortedColumnOverwritten; } - public Set<String> getVariedLengthDictionaryColumns() { - return _variedLengthDictionaryColumns; + public Set<String> getVarLengthDictionaryColumns() { + return _varLengthDictionaryColumns; } public Set<String> getBloomFilterColumns() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java index 0af4635..57ede19 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java @@ -45,9 +45,7 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.CONSUMING_MEMORY_PER_HOST; -import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.OPTIMAL_SEGMENT_SIZE; -import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.TOTAL_MEMORY_USED_PER_HOST; +import static org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.*; import static org.testng.Assert.*; @@ -231,7 +229,7 @@ public class TestConfigEngine { AbstractRule abstractRule = RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.VariedLengthDictionaryRule, _input, output); abstractRule.run(); - assertEquals(output.getIndexConfig().getVariedLengthDictionaryColumns().toString(), "[a, d, m]"); + assertEquals(output.getIndexConfig().getVarLengthDictionaryColumns().toString(), "[a, d, m]"); } @Test @@ -436,6 +434,13 @@ public class TestConfigEngine { } @Test + void testRealtimeProvisioningRuleWithHighIngestionRate() throws Exception { + // Total memory for some of the options are greater than the provided max memory in a host. + // For those option, the returned values is "NA" + testRealtimeProvisioningRule("recommenderInput/RealtimeProvisioningInput_highIngestionRate.json"); + } + + @Test void testAggregateMetricsRule() throws Exception { ConfigManager output = runRecommenderDriver("recommenderInput/AggregateMetricsRuleInput.json"); @@ -487,6 +492,8 @@ public class TestConfigEngine { ConfigManager output = runRecommenderDriver(fileName); Map<String, Map<String, String>> recommendations = output.getRealtimeProvisioningRecommendations(); assertRealtimeProvisioningRecommendation(recommendations.get(OPTIMAL_SEGMENT_SIZE)); + assertRealtimeProvisioningRecommendation(recommendations.get(NUM_ROWS_IN_SEGMENT)); + assertRealtimeProvisioningRecommendation(recommendations.get(NUM_SEGMENTS_QUERIED_PER_HOST)); assertRealtimeProvisioningRecommendation(recommendations.get(CONSUMING_MEMORY_PER_HOST)); assertRealtimeProvisioningRecommendation(recommendations.get(TOTAL_MEMORY_USED_PER_HOST)); } diff --git a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json new file mode 100644 index 0000000..3f687be --- /dev/null +++ b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json @@ -0,0 +1,75 @@ +{ + "schema": { + "dateTimeFieldSpecs": [ + { + "dataType": "LONG", + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS", + "name": "timestampMillis", + "cardinality": 10000 + } + ], + "dimensionFieldSpecs": [ + { + "averageLength": 8, + "cardinality": 16, + "dataType": "STRING", + "name": "colA" + }, + { + "averageLength": 16, + "cardinality": 200, + "dataType": "STRING", + "name": "colB" + }, + { + "averageLength": 50, + "cardinality": 100000, + "dataType": "STRING", + "name": "colC" + }, + { + "averageLength": 25, + "cardinality": 5000, + "dataType": "INT", + "name": "partition" + } + ], + "metricFieldSpecs": [ + { + "cardinality": 5000, + "dataType": "LONG", + "name": "metricA" + }, + { + "cardinality": 5000, + "dataType": "LONG", + "name": "metricB" + } + ], + "schemaName": "myTable" + }, + "queriesWithWeights":{ + "select colC, \"partition\" as partitionNum, max(metricA) as maxMetricA,avg(metricA) as avgMetricA, avg(metricB) as avgMetricB from myTable where colA='valA' and timestampMillis > now() - 3600000 and colB='valB' and timestampMillis < now() group by colB,colC,\"partition\" order by max(metricA) desc limit 10000": 1, + "select colC, \"partition\" as partitionNum, max(metricA) as maxMetricA,avg(metricA) as avgMetricA from myTable where colA='val1' and timestampMillis > now() - 3600000 and colB='valB' and timestampMillis < now() and maxMetricA > 2000 group by colB,colC,\"partition\" order by max(metricA) desc limit 10000": 1, + "select timestampMillis, sum(metricB) from myTable where (colC='A' or colC='B') and (timestampMillis >= 123) group by timestampMillis order by timestampMillis asc": 1 + }, + "qps": 10, + "tableType": "REALTIME", + "latencySLA": 500, + "rulesToExecute": { + "recommendRealtimeProvisioning": true + }, + "numMessagesPerSecInKafkaTopic":50000, + "realtimeProvisioningRuleParams": { + "numPartitions": 32, + "numReplicas": 2, + "realtimeTableRetentionHours": 24, + "maxUsableHostMemory": "150G", + "numHours": [2, 4, 6, 8, 10, 12], + "numHosts": [3, 6, 9, 12, 15, 18, 21], + "numRowsInGeneratedSegment": 10000 + }, + "overWrittenConfigs": { + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org