This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3892812  Add "num rows in segments" and "num segments queried per 
host" to the output of Realtime Provisioning Rule (#7282)
3892812 is described below

commit 38928124496237402b2260c91fa170b2f3fa0779
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Wed Aug 25 18:41:16 2021 -0700

    Add "num rows in segments" and "num segments queried per host" to the 
output of Realtime Provisioning Rule (#7282)
    
    * Fix issue with output of Realtime Prov. Rule
    
    * Fix rebase conflict
    
    * Resolve conflict on git rebase
    
    * Add num rows for each segment size recommendation
    
    * Add num segments queried per host to the output
    
    * Fix "division by zero" issue with low ingestion rate use cases
    
    * Make long line shorter
---
 .../realtime/provisioning/MemoryEstimator.java     | 14 +++-
 .../rules/impl/RealtimeProvisioningRule.java       | 31 ++++++++-
 .../rules/impl/VariedLengthDictionaryRule.java     |  2 +-
 .../recommender/rules/io/configs/IndexConfig.java  |  8 +--
 .../controller/recommender/TestConfigEngine.java   | 15 +++--
 ...ealtimeProvisioningInput_highIngestionRate.json | 75 ++++++++++++++++++++++
 6 files changed, 130 insertions(+), 15 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
index e0cc02d..fd915e1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MemoryEstimator {
 
-  private static final String NOT_APPLICABLE = "NA";
+  public static final String NOT_APPLICABLE = "NA";
   private static final String STATS_FILE_NAME = "stats.ser";
   private static final String STATS_FILE_COPY_NAME = "stats.copy.ser";
 
@@ -95,13 +95,14 @@ public class MemoryEstimator {
 
   private String[][] _activeMemoryPerHost;
   private String[][] _optimalSegmentSize;
+  private String[][] _numRowsInSegment;
   private String[][] _consumingMemoryPerHost;
   private String[][] _numSegmentsQueriedPerHost;
 
   /**
    * Constructor used for processing the given completed segment
    */
-  public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, 
int ingestionRatePerPartition,
+  public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, 
double ingestionRatePerPartition,
       long maxUsableHostMemory, int tableRetentionHours, File workingDir) {
     _maxUsableHostMemory = maxUsableHostMemory;
     _tableConfig = tableConfig;
@@ -135,7 +136,7 @@ public class MemoryEstimator {
    * Constructor used for processing the given data characteristics (instead 
of completed segment)
    */
   public MemoryEstimator(TableConfig tableConfig, Schema schema, 
SchemaWithMetaData schemaWithMetadata,
-      int numberOfRows, int ingestionRatePerPartition, long 
maxUsableHostMemory, int tableRetentionHours,
+      int numberOfRows, double ingestionRatePerPartition, long 
maxUsableHostMemory, int tableRetentionHours,
       File workingDir) {
     this(tableConfig, generateCompletedSegment(schemaWithMetadata, schema, 
tableConfig, numberOfRows, workingDir),
         ingestionRatePerPartition, maxUsableHostMemory, tableRetentionHours, 
workingDir);
@@ -240,12 +241,14 @@ public class MemoryEstimator {
       throws IOException {
     _activeMemoryPerHost = new String[numHours.length][numHosts.length];
     _optimalSegmentSize = new String[numHours.length][numHosts.length];
+    _numRowsInSegment = new String[numHours.length][numHosts.length];
     _consumingMemoryPerHost = new String[numHours.length][numHosts.length];
     _numSegmentsQueriedPerHost = new String[numHours.length][numHosts.length];
     for (int i = 0; i < numHours.length; i++) {
       for (int j = 0; j < numHosts.length; j++) {
         _activeMemoryPerHost[i][j] = NOT_APPLICABLE;
         _consumingMemoryPerHost[i][j] = NOT_APPLICABLE;
+        _numRowsInSegment[i][j] = NOT_APPLICABLE;
         _optimalSegmentSize[i][j] = NOT_APPLICABLE;
         _numSegmentsQueriedPerHost[i][j] = NOT_APPLICABLE;
       }
@@ -296,6 +299,7 @@ public class MemoryEstimator {
                 DataSizeUtils.fromBytes(activeMemoryPerHostBytes) + "/" + 
DataSizeUtils.fromBytes(mappedMemoryPerHost);
             _consumingMemoryPerHost[i][j] = 
DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost);
             _optimalSegmentSize[i][j] = 
DataSizeUtils.fromBytes(completedSegmentSizeBytes);
+            _numRowsInSegment[i][j] = String.valueOf(totalDocs);
             _numSegmentsQueriedPerHost[i][j] =
                 String.valueOf(numActiveSegmentsPerPartition * 
totalConsumingPartitionsPerHost);
           }
@@ -431,6 +435,10 @@ public class MemoryEstimator {
     return _optimalSegmentSize;
   }
 
+  public String[][] getNumRowsInSegment() {
+    return _numRowsInSegment;
+  }
+
   public String[][] getConsumingMemoryPerHost() {
     return _consumingMemoryPerHost;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
index 7096186..d505cfb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/RealtimeProvisioningRule.java
@@ -43,6 +43,8 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 
+import static 
org.apache.pinot.controller.recommender.realtime.provisioning.MemoryEstimator.NOT_APPLICABLE;
+
 
 /**
  * This rule gives some recommendations useful for provisioning real time 
tables. Specifically it provides some
@@ -51,6 +53,8 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
  */
 public class RealtimeProvisioningRule extends AbstractRule {
   public static final String OPTIMAL_SEGMENT_SIZE = "Optimal Segment Size";
+  public static final String NUM_ROWS_IN_SEGMENT = "Number of Rows in Segment";
+  public static final String NUM_SEGMENTS_QUERIED_PER_HOST = "Number of 
Segments Queried per Host";
   public static final String CONSUMING_MEMORY_PER_HOST = "Consuming Memory per 
Host";
   public static final String TOTAL_MEMORY_USED_PER_HOST = "Total Memory Used 
per Host";
 
@@ -75,7 +79,7 @@ public class RealtimeProvisioningRule extends AbstractRule {
         createTableConfig(_output.getIndexConfig(), _input.getSchema(), 
_output.isAggregateMetrics());
     long maxUsableHostMemoryByte = 
DataSizeUtils.toBytes(_params.getMaxUsableHostMemory());
     int totalConsumingPartitions = _params.getNumPartitions() * 
_params.getNumReplicas();
-    int ingestionRatePerPartition = (int) 
_input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions();
+    double ingestionRatePerPartition = (double) 
_input.getNumMessagesPerSecInKafkaTopic() / _params.getNumPartitions();
     int retentionHours = _params.getRealtimeTableRetentionHours();
     int[] numHosts = _params.getNumHosts();
     int[] numHours = _params.getNumHours();
@@ -108,7 +112,7 @@ public class RealtimeProvisioningRule extends AbstractRule {
     setIfNotEmpty(indexConfig.getNoDictionaryColumns(), 
tableConfigBuilder::setNoDictionaryColumns);
     setIfNotEmpty(indexConfig.getInvertedIndexColumns(), 
tableConfigBuilder::setInvertedIndexColumns);
     setIfNotEmpty(indexConfig.getOnHeapDictionaryColumns(), 
tableConfigBuilder::setOnHeapDictionaryColumns);
-    setIfNotEmpty(indexConfig.getVariedLengthDictionaryColumns(), 
tableConfigBuilder::setVarLengthDictionaryColumns);
+    setIfNotEmpty(indexConfig.getVarLengthDictionaryColumns(), 
tableConfigBuilder::setVarLengthDictionaryColumns);
 
     TableConfig tableConfig = tableConfigBuilder.build();
     tableConfig.getIndexingConfig().setAggregateMetrics(aggregateMetrics);
@@ -131,9 +135,16 @@ public class RealtimeProvisioningRule extends AbstractRule 
{
       Map<String, Map<String, String>> rtProvRecommendations) {
     Map<String, String> segmentSizes = 
makeMatrix(memoryEstimator.getOptimalSegmentSize(), numHosts, numHours);
     Map<String, String> consumingMemory = 
makeMatrix(memoryEstimator.getConsumingMemoryPerHost(), numHosts, numHours);
+    Map<String, String> numSegmentsQueried =
+        makeMatrix(memoryEstimator.getNumSegmentsQueriedPerHost(), numHosts, 
numHours);
+    Map<String, String> numRowsInSegment = 
makeMatrix(memoryEstimator.getNumRowsInSegment(), numHosts, numHours,
+        element -> element.equals(NOT_APPLICABLE) ? element : 
convertLargeNumberToHumanReadable(element));
     Map<String, String> totalMemory = 
makeMatrix(memoryEstimator.getActiveMemoryPerHost(), numHosts, numHours,
-        element -> element.substring(0, element.indexOf('/'))); // take the 
first number (eg: 48G/48G)
+        element -> element.equals(NOT_APPLICABLE) ? element
+            : element.substring(0, element.indexOf('/'))); // take the first 
number (eg: 48G/48G)
     rtProvRecommendations.put(OPTIMAL_SEGMENT_SIZE, segmentSizes);
+    rtProvRecommendations.put(NUM_ROWS_IN_SEGMENT, numRowsInSegment);
+    rtProvRecommendations.put(NUM_SEGMENTS_QUERIED_PER_HOST, 
numSegmentsQueried);
     rtProvRecommendations.put(CONSUMING_MEMORY_PER_HOST, consumingMemory);
     rtProvRecommendations.put(TOTAL_MEMORY_USED_PER_HOST, totalMemory);
   }
@@ -173,4 +184,18 @@ public class RealtimeProvisioningRule extends AbstractRule 
{
 
     return output;
   }
+
+  private String convertLargeNumberToHumanReadable(String num) {
+    int val = Integer.parseInt(num);
+    if (val >= 10_000_000) {
+      return (val / 1_000_000) + "M";
+    }
+    if (val >= 1_000_000) {
+      return (val / 100_000) / 10.0 + "M"; // eg: 5,432,000 -> 5.4M
+    }
+    if (val >= 10_000) {
+      return (val / 1000) + "K";
+    }
+    return num;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
index b530601..7573591 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/VariedLengthDictionaryRule.java
@@ -44,7 +44,7 @@ public class VariedLengthDictionaryRule extends AbstractRule {
         LOGGER.debug("{} {}", _input.getFieldType(colName), colName);
         if (_input.getFieldType(colName) == FieldSpec.DataType.STRING
             || _input.getFieldType(colName) == FieldSpec.DataType.BYTES) {
-          
_output.getIndexConfig().getVariedLengthDictionaryColumns().add(colName);
+          
_output.getIndexConfig().getVarLengthDictionaryColumns().add(colName);
         }
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
index 2f17615..160491a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/io/configs/IndexConfig.java
@@ -36,13 +36,13 @@ public class IndexConfig {
 
   Set<String> _noDictionaryColumns = new HashSet<>();
   Set<String> _onHeapDictionaryColumns = new HashSet<>();
-  Set<String> _variedLengthDictionaryColumns = new HashSet<>();
+  Set<String> _varLengthDictionaryColumns = new HashSet<>();
 
   boolean _isSortedColumnOverwritten = false;
 
   @JsonSetter(nulls = Nulls.SKIP)
   public void setVariedLengthDictionaryColumns(Set<String> 
variedLengthDictionaryColumns) {
-    _variedLengthDictionaryColumns = variedLengthDictionaryColumns;
+    _varLengthDictionaryColumns = variedLengthDictionaryColumns;
   }
 
   @JsonSetter(nulls = Nulls.SKIP)
@@ -84,8 +84,8 @@ public class IndexConfig {
     _isSortedColumnOverwritten = sortedColumnOverwritten;
   }
 
-  public Set<String> getVariedLengthDictionaryColumns() {
-    return _variedLengthDictionaryColumns;
+  public Set<String> getVarLengthDictionaryColumns() {
+    return _varLengthDictionaryColumns;
   }
 
   public Set<String> getBloomFilterColumns() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
index 0af4635..57ede19 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/recommender/TestConfigEngine.java
@@ -45,9 +45,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.CONSUMING_MEMORY_PER_HOST;
-import static 
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.OPTIMAL_SEGMENT_SIZE;
-import static 
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.TOTAL_MEMORY_USED_PER_HOST;
+import static 
org.apache.pinot.controller.recommender.rules.impl.RealtimeProvisioningRule.*;
 import static org.testng.Assert.*;
 
 
@@ -231,7 +229,7 @@ public class TestConfigEngine {
     AbstractRule abstractRule =
         
RulesToExecute.RuleFactory.getRule(RulesToExecute.Rule.VariedLengthDictionaryRule,
 _input, output);
     abstractRule.run();
-    
assertEquals(output.getIndexConfig().getVariedLengthDictionaryColumns().toString(),
 "[a, d, m]");
+    
assertEquals(output.getIndexConfig().getVarLengthDictionaryColumns().toString(),
 "[a, d, m]");
   }
 
   @Test
@@ -436,6 +434,13 @@ public class TestConfigEngine {
   }
 
   @Test
+  void testRealtimeProvisioningRuleWithHighIngestionRate() throws Exception {
+    // Total memory for some of the options are greater than the provided max 
memory in a host.
+    // For those option, the returned values is "NA"
+    
testRealtimeProvisioningRule("recommenderInput/RealtimeProvisioningInput_highIngestionRate.json");
+  }
+
+  @Test
   void testAggregateMetricsRule()
       throws Exception {
     ConfigManager output = 
runRecommenderDriver("recommenderInput/AggregateMetricsRuleInput.json");
@@ -487,6 +492,8 @@ public class TestConfigEngine {
     ConfigManager output = runRecommenderDriver(fileName);
     Map<String, Map<String, String>> recommendations = 
output.getRealtimeProvisioningRecommendations();
     
assertRealtimeProvisioningRecommendation(recommendations.get(OPTIMAL_SEGMENT_SIZE));
+    
assertRealtimeProvisioningRecommendation(recommendations.get(NUM_ROWS_IN_SEGMENT));
+    
assertRealtimeProvisioningRecommendation(recommendations.get(NUM_SEGMENTS_QUERIED_PER_HOST));
     
assertRealtimeProvisioningRecommendation(recommendations.get(CONSUMING_MEMORY_PER_HOST));
     
assertRealtimeProvisioningRecommendation(recommendations.get(TOTAL_MEMORY_USED_PER_HOST));
   }
diff --git 
a/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json
 
b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json
new file mode 100644
index 0000000..3f687be
--- /dev/null
+++ 
b/pinot-controller/src/test/resources/recommenderInput/RealtimeProvisioningInput_highIngestionRate.json
@@ -0,0 +1,75 @@
+{
+  "schema": {
+    "dateTimeFieldSpecs": [
+      {
+        "dataType": "LONG",
+        "format": "1:MILLISECONDS:EPOCH",
+        "granularity": "1:MILLISECONDS",
+        "name": "timestampMillis",
+        "cardinality": 10000
+      }
+    ],
+    "dimensionFieldSpecs": [
+      {
+        "averageLength": 8,
+        "cardinality": 16,
+        "dataType": "STRING",
+        "name": "colA"
+      },
+      {
+        "averageLength": 16,
+        "cardinality": 200,
+        "dataType": "STRING",
+        "name": "colB"
+      },
+      {
+        "averageLength": 50,
+        "cardinality": 100000,
+        "dataType": "STRING",
+        "name": "colC"
+      },
+      {
+        "averageLength": 25,
+        "cardinality": 5000,
+        "dataType": "INT",
+        "name": "partition"
+      }
+    ],
+    "metricFieldSpecs": [
+      {
+        "cardinality": 5000,
+        "dataType": "LONG",
+        "name": "metricA"
+      },
+      {
+        "cardinality": 5000,
+        "dataType": "LONG",
+        "name": "metricB"
+      }
+    ],
+    "schemaName": "myTable"
+  },
+  "queriesWithWeights":{
+    "select colC, \"partition\" as partitionNum, max(metricA) as 
maxMetricA,avg(metricA) as avgMetricA, avg(metricB) as avgMetricB from myTable 
where colA='valA' and timestampMillis > now() - 3600000 and colB='valB' and 
timestampMillis < now() group by colB,colC,\"partition\" order by max(metricA) 
desc limit 10000": 1,
+    "select colC, \"partition\" as partitionNum, max(metricA) as 
maxMetricA,avg(metricA) as avgMetricA from myTable where colA='val1' and 
timestampMillis > now() - 3600000 and colB='valB' and timestampMillis < now() 
and maxMetricA > 2000 group by colB,colC,\"partition\" order by max(metricA) 
desc limit 10000": 1,
+    "select timestampMillis, sum(metricB) from myTable where (colC='A' or 
colC='B') and (timestampMillis >= 123) group by timestampMillis order by 
timestampMillis asc": 1
+  },
+  "qps": 10,
+  "tableType": "REALTIME",
+  "latencySLA": 500,
+  "rulesToExecute": {
+    "recommendRealtimeProvisioning": true
+  },
+  "numMessagesPerSecInKafkaTopic":50000,
+  "realtimeProvisioningRuleParams": {
+    "numPartitions": 32,
+    "numReplicas": 2,
+    "realtimeTableRetentionHours": 24,
+    "maxUsableHostMemory": "150G",
+    "numHours": [2, 4, 6, 8, 10, 12],
+    "numHosts": [3, 6, 9, 12, 15, 18, 21],
+    "numRowsInGeneratedSegment": 10000
+  },
+  "overWrittenConfigs": {
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to