siddharthteotia commented on a change in pull request #5774:
URL: https://github.com/apache/incubator-pinot/pull/5774#discussion_r465798271



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/NoDictionaryOnHeapDictionaryJointRule.java
##########
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.recommender.rules.impl;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import 
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
+import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
+import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
+import org.apache.pinot.parsers.AbstractCompiler;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.apache.pinot.controller.recommender.io.ConfigManager;
+import org.apache.pinot.controller.recommender.io.InputManager;
+import org.apache.pinot.controller.recommender.rules.AbstractRule;
+import 
org.apache.pinot.controller.recommender.rules.io.params.NoDictionaryOnHeapDictionaryJointRuleParams;
+import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static 
org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.NoDictionaryOnHeapDictionaryJointRule.*;
+import static 
org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants.REALTIME;
+
+
+public class NoDictionaryOnHeapDictionaryJointRule extends AbstractRule {
+  private final Logger LOGGER = 
LoggerFactory.getLogger(NoDictionaryOnHeapDictionaryJointRule.class);
+  private final BrokerRequestOptimizer _brokerRequestOptimizer = new 
BrokerRequestOptimizer();
+  private final NoDictionaryOnHeapDictionaryJointRuleParams _params;
+
+  public NoDictionaryOnHeapDictionaryJointRule(InputManager inputManager, 
ConfigManager outputManager) {
+    super(inputManager, outputManager);
+    _params = inputManager.getNoDictionaryOnHeapDictionaryJointRuleParams();
+  }
+
+  @Override
+  public void run() {
+    LOGGER.info("Recommending no dictionary and on-heap dictionaries");
+
+    int numCols = _inputManager.getNumCols();
+    double[] filterGroupByWeights = new double[numCols];
+    double[] selectionWeights = new double[numCols];
+    AtomicDouble totalWeight = new AtomicDouble(0);
+
+    //**********No dictionary recommendation*******/
+    Set<String> noDictCols = new 
HashSet<>(_inputManager.getColNameToIntMap().keySet());
+
+    //Exclude cols with index
+    
noDictCols.removeAll(_outputManager.getIndexConfig().getInvertedIndexColumns());
+    noDictCols.removeAll(_outputManager.getIndexConfig().getSortedColumn());
+    // TODO: Remove this after range index is implemented for no-dictionary
+    
noDictCols.removeAll(_outputManager.getIndexConfig().getRangeIndexColumns());
+    LOGGER.debug("noDictCols {}", noDictCols);
+    //Find out columns used in filter&groupby and selection and corresponding 
frequencies
+    _inputManager.getQueryWeightMap().forEach((query, weight) -> {
+      parseQuery(query, weight, filterGroupByWeights, selectionWeights);
+      totalWeight.addAndGet(weight);
+    });
+
+    //Add dictionary on columns used in filter&groupby , with frequency > 
threshold
+    for (int i = 0; i < numCols; i++) {
+      double filterGroupByFreq = filterGroupByWeights[i] / totalWeight.get();
+      if (filterGroupByFreq > _params.THRESHOLD_MIN_FILTER_FREQ_DICTIONARY) {
+        noDictCols.remove(_inputManager.intToColName(i));
+      }
+    }
+
+    LOGGER.debug("filterGroupByWeights {}, selectionWeights{}, totalWeight{} 
", filterGroupByWeights, selectionWeights,
+        totalWeight);
+    LOGGER.debug("noDictCols {}", noDictCols);
+
+    for (int i = 0; i < numCols; i++) {
+      // No dictionary on columns frequently used in selection
+      double selectionFreq = selectionWeights[i] / totalWeight.get();
+      if (selectionFreq > _params.THRESHOLD_MAX_SELECTION_FREQ_DICTIONARY) {
+        continue;
+      }
+
+      // Add dictionary on columns NOT frequently used in selection
+      // AND can save storage > threshold
+      String colName = _inputManager.intToColName(i);
+      double noDictSize;
+      double withDictSize;
+      long colDataSizeWithoutDictionary = 
_inputManager.getColDataSizeWithoutDictionary(colName);
+      double numValuesPerEntry = _inputManager.getNumValuesPerEntry(colName);
+      int bitCompressedDataSize = 
_inputManager.getBitCompressedDataSize(colName);
+      long dictionarySize = _inputManager.getDictionarySize(colName);
+      double cardinality = _inputManager.getCardinality(colName);
+      long numRecordsPerPush = _inputManager.getNumRecordsPerPush();
+      LOGGER.debug("colDataSizeWithoutDictionary {}", 
colDataSizeWithoutDictionary);
+      LOGGER.debug("bitCompressedDataSize {}", bitCompressedDataSize);
+      LOGGER.debug("dictionarySize {}", dictionarySize);
+      LOGGER.debug("numValuesPerEntry {}", numValuesPerEntry);
+
+      if (_inputManager.getTableType().equalsIgnoreCase(REALTIME)) {
+        //TODO: improve this estimation
+        noDictSize = // size of one segment flushed ith no dictionary
+            colDataSizeWithoutDictionary * numValuesPerEntry * 
_params.SEGMENT_FLUSH_TIME;
+        withDictSize = // size of one flushed segment with dictionary
+            dictionarySize + bitCompressedDataSize * numValuesPerEntry * 
_params.SEGMENT_FLUSH_TIME;
+      } else { // For hybrid or offline table, nodictionary follows the 
offline side
+        noDictSize = // size of all segments in one push  with no dictionary
+            colDataSizeWithoutDictionary * numValuesPerEntry * 
numRecordsPerPush;
+        withDictSize = // size of all segments in one push with dictionary
+            dictionarySize * dictionaryCoefficient(cardinality, 
numRecordsPerPush) * DEFAUlT_NUM_PARTITIONS
+                + bitCompressedDataSize * numValuesPerEntry * 
numRecordsPerPush;
+      }
+
+      double storageSaved = (double) (noDictSize - withDictSize) / noDictSize;
+      LOGGER.debug("colName {}, noDictSize {}, withDictSize{}, 
storageSaved{}", colName, noDictSize, withDictSize,
+          storageSaved);
+
+      if (storageSaved > 
_params.THRESHOLD_MIN_PERCENT_DICTIONARY_STORAGE_SVAE) {
+        noDictCols.remove(colName);
+      }
+    }
+
+    // Add the no dictionary cols to config
+    
_outputManager.getIndexConfig().getNoDictionaryColumns().addAll(noDictCols);
+
+    //**********On heap dictionary recommendation*******/
+    if (_inputManager.getQps() > _params.THRESHOLD_MIN_QPS_ON_HEAP) { // QPS > 
THRESHOLD_MIN_QPS_ON_HEAP
+      for (String colName : _inputManager.getColNameToIntMap().keySet()) {
+        if 
(!_outputManager.getIndexConfig().getNoDictionaryColumns().contains(colName)) 
//exclude no dictionary column
+        {
+          long dictionarySize = _inputManager.getDictionarySize(colName);
+          int colId = _inputManager.colNameToInt(colName);
+          double filterGroupByFreq = filterGroupByWeights[colId] / 
totalWeight.get();
+          if (filterGroupByFreq > _params.THRESHOLD_MIN_FILTER_FREQ_ON_HEAP  
//frequently used in filter/group by
+              && dictionarySize < 
_params.THRESHOLD_MAX_DICTIONARY_SIZE_ON_HEAP) { // memory foot print < 
threshold
+            
_outputManager.getIndexConfig().getOnHeapDictionaryColumns().add(colName);
+          }
+        }
+      }
+    }
+  }
+
+  private double dictionaryCoefficient(double cardinality, long 
numRecordsPerPush) {
+    return 1 - min(max(DEFAULT_DICT_COEFF_A * Math.log(DEFAULT_DICT_COEFF_B * 
cardinality / numRecordsPerPush),
+        DEFAULT_DICT_LOWER), DEFAULT_DICT_UPPER);
+  }
+
+  public void parseQuery(String queryString, double weight, double[] 
filterGroupByWeights, double[] selectionWeights) {

Review comment:
       General question - Would it be possible to parse the query exactly once 
before the execution of first rule begins? Right now, it seems like as the 
rules are fired in order, each rule will parse the input query set? Even though 
the algorithm of each rule is different, is it possible to parse once and 
extract all the common info needed by all the rules?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to