sajjad-moradi commented on a change in pull request #6869: URL: https://github.com/apache/incubator-pinot/pull/6869#discussion_r630548185
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java ########## @@ -149,24 +146,33 @@ public void init() reorderDimsAndBuildMap(); registerColNameFieldType(); validateQueries(); - if (_useCardinalityNormalization) { - regulateCardinalityForAll(); - } } - private void regulateCardinalityForAll() { - double sampleSize; - if (getTableType().equalsIgnoreCase(REALTIME)) { - sampleSize = getSegmentFlushTime() * getNumMessagesPerSecInKafkaTopic(); - } else { - sampleSize = getNumRecordsPerPush(); - } - + /** + * Cardinalities provided by users are relative to number of records per push, but we might end up creating multiple + * segments for each push. Using this methods, cardinalities will be capped by the provided number of rows in segment. + */ + public void capCardinalities(int numRecordsInSegment) { _metaDataMap.keySet().forEach(colName -> { - int cardinality = _metaDataMap.get(colName).getCardinality(); - double regulatedCardinality = regulateCardinalityInfinitePopulation(cardinality, sampleSize); - _metaDataMap.get(colName).setCardinality((int) Math.round(regulatedCardinality)); + int cardinality = Math.min(numRecordsInSegment, _metaDataMap.get(colName).getCardinality()); + _metaDataMap.get(colName).setCardinality(cardinality); }); + if (_schemaWithMetaData.getDimensionFieldSpecs() != null) { + _schemaWithMetaData.getDimensionFieldSpecs() Review comment: Good catch. Done. ########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/recommender/rules/impl/PinotTablePartitionRule.java ########## @@ -105,26 +103,30 @@ public void run() LOGGER.info("*Recommending number of partitions "); int numKafkaPartitions = _output.getPartitionConfig().getNumKafkaPartitions(); - long offLineDataSizePerPush = _input.getNumRecordsPerPush() * _input.getSizePerRecord(); - int optimalOfflinePartitions = (int) Math.ceil((double) offLineDataSizePerPush / _params.OPTIMAL_SIZE_PER_SEGMENT); - if (_input.getTableType().equalsIgnoreCase(REALTIME) || _input.getTableType().equalsIgnoreCase(HYBRID)) { + boolean isRealtimeTable = _input.getTableType().equalsIgnoreCase(REALTIME); + boolean isHybridTable = _input.getTableType().equalsIgnoreCase(HYBRID); + boolean isOfflineTable = _input.getTableType().equalsIgnoreCase(OFFLINE); + if (isRealtimeTable || isHybridTable) { //real time num of partitions should be the same value as the number of kafka partitions if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsRealtimeOverwritten()) { _output.getPartitionConfig().setNumPartitionsRealtime(numKafkaPartitions); } } - if (_input.getTableType().equalsIgnoreCase(OFFLINE)) { + if (isOfflineTable) { //Offline partition num is dependent on the amount of data coming in on a given day. //Using a very high value of numPartitions for small dataset size will result in too many small sized segments. //We define have a desirable segment size OPTIMAL_SIZE_PER_SEGMENT //Divide the size of data coming in on a given day by OPTIMAL_SIZE_PER_SEGMENT we get the number of partitions. if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsOfflineOverwritten()) { - _output.getPartitionConfig().setNumPartitionsOffline((int) (optimalOfflinePartitions)); + int optimalOfflinePartitions = (int) _output.getSegmentSizeRecommendations().getNumSegments(); + _output.getPartitionConfig().setNumPartitionsOffline(optimalOfflinePartitions); } } - if (_input.getTableType().equalsIgnoreCase(HYBRID)) { + if (isHybridTable) { if (!_input.getOverWrittenConfigs().getPartitionConfig().isNumPartitionsOfflineOverwritten()) { - _output.getPartitionConfig().setNumPartitionsOffline(Math.min(optimalOfflinePartitions, numKafkaPartitions)); + int optimalOfflinePartitions = + Math.min((int) _output.getSegmentSizeRecommendations().getNumSegments(), numKafkaPartitions); Review comment: Yes, I didn't want to change the existing behavior, but I agree with you. The logic for offline partitioning should be the same for offline and hybrid tables. It's refactored now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org