jackjlli commented on a change in pull request #7299: URL: https://github.com/apache/pinot/pull/7299#discussion_r694355060
########## File path: pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java ########## @@ -90,4 +119,131 @@ protected boolean isDataFile(String fileName) { // TODO: support orc format in the future. return fileName.endsWith(".avro"); } + + protected void setTableConfigAndSchema() + throws IOException { + _tableConfig = getTableConfig(); + _pinotTableSchema = getSchema(); + + Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); + Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); + } + + protected void fetchPreProcessingOperations() { + _preprocessingOperations = new HashSet<>(); + TableCustomConfig customConfig = _tableConfig.getCustomConfig(); + if (customConfig != null) { + Map<String, String> customConfigMap = customConfig.getCustomConfigs(); + if (customConfigMap != null && !customConfigMap.isEmpty()) { + String preprocessingOperationsString = + customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); + DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); + } + } + } + + protected void fetchPartitioningConfig() { + // Fetch partition info from table config. + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { + LOGGER.info("Partitioning is disabled."); + return; + } + SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig != null) { + Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); + Preconditions + .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); + if (columnPartitionMap.size() == 1) { + _partitionColumn = columnPartitionMap.keySet().iterator().next(); + _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); + _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); + } + } else { + LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); + } + } + + protected void fetchSortingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { + LOGGER.info("Sorting is disabled."); + return; + } + // Fetch sorting info from table config first. + List<String> sortingColumns = new ArrayList<>(); + List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList(); + if (fieldConfigs != null && !fieldConfigs.isEmpty()) { + for (FieldConfig fieldConfig : fieldConfigs) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { + sortingColumns.add(fieldConfig.getName()); + } + } + } + if (!sortingColumns.isEmpty()) { + Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); Review comment: Updated it to `size() <= 1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org