Jackie-Jiang commented on a change in pull request #6840: URL: https://github.com/apache/incubator-pinot/pull/6840#discussion_r621710765
########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -580,4 +615,120 @@ private static IndexingConfig sanitizeIndexingConfig(IndexingConfig indexingConf } return null; } + + /** + * Apply TunerConfig to the tableConfig + */ + public static void applyTunerConfig(TableConfig tableConfig, Schema schema) { + TunerConfig tunerConfig = tableConfig.getTunerConfig(); + if (tunerConfig != null && tunerConfig.getName() != null && !tunerConfig.getName().isEmpty()) { + TableConfigTuner tuner = TableConfigTunerRegistry.getTuner(tunerConfig.getName()); + tuner.init(tunerConfig, schema); + tuner.apply(tableConfig); + } + } + + /** + * Ensure that the table config has the minimum number of replicas set as per cluster configs. + * If is doesn't, set the required amount of replication in the table config + */ + public static void ensureMinReplicas(TableConfig tableConfig, int defaultTableMinReplicas) { + // For self-serviced cluster, ensure that the tables are created with at least min replication factor irrespective + // of table configuration value + SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig(); + boolean verifyReplicasPerPartition; + boolean verifyReplication; + + try { + verifyReplicasPerPartition = ReplicationUtils.useReplicasPerPartition(tableConfig); + verifyReplication = ReplicationUtils.useReplication(tableConfig); + } catch (Exception e) { + throw new IllegalStateException(String.format("Invalid tableIndexConfig or streamConfig: %s", e.getMessage()), e); + } + + if (verifyReplication) { + int requestReplication; + try { + requestReplication = segmentsConfig.getReplicationNumber(); + if (requestReplication < defaultTableMinReplicas) { + LOGGER.info("Creating table with minimum replication factor of: {} instead of requested replication: {}", + defaultTableMinReplicas, requestReplication); + segmentsConfig.setReplication(String.valueOf(defaultTableMinReplicas)); + } + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid replication number", e); + } + } + + if (verifyReplicasPerPartition) { + String replicasPerPartitionStr = segmentsConfig.getReplicasPerPartition(); + if (replicasPerPartitionStr == null) { + throw new IllegalStateException("Field replicasPerPartition needs to be specified"); + } + try { + int replicasPerPartition = Integer.parseInt(replicasPerPartitionStr); + if (replicasPerPartition < defaultTableMinReplicas) { + LOGGER.info( + "Creating table with minimum replicasPerPartition of: {} instead of requested replicasPerPartition: {}", + defaultTableMinReplicas, replicasPerPartition); + segmentsConfig.setReplicasPerPartition(String.valueOf(defaultTableMinReplicas)); + } + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value for replicasPerPartition: '" + replicasPerPartitionStr + "'", e); + } + } + } + + /** + * Ensure the table config has storage quota set as per cluster configs. + * If it doesn't, set the quota config into the table config + */ + public static void ensureStorageQuotaConstraints(TableConfig tableConfig, String maxAllowedSize) { + // Dim tables must adhere to cluster level storage size limits + if (tableConfig.isDimTable()) { + QuotaConfig quotaConfig = tableConfig.getQuotaConfig(); + long maxAllowedSizeInBytes = DataSizeUtils.toBytes(maxAllowedSize); + + if (quotaConfig == null) { + // set a default storage quota + tableConfig.setQuotaConfig(new QuotaConfig(maxAllowedSize, null)); + LOGGER.info("Assigning default storage quota ({}) for dimension table: {}", maxAllowedSize, + tableConfig.getTableName()); + } else { + if (quotaConfig.getStorage() == null) { + // set a default storage quota and keep the RPS value + tableConfig.setQuotaConfig(new QuotaConfig(maxAllowedSize, quotaConfig.getMaxQueriesPerSecond())); + LOGGER.info("Assigning default storage quota ({}) for dimension table: {}", maxAllowedSize, + tableConfig.getTableName()); + } else { + if (quotaConfig.getStorageInBytes() > maxAllowedSizeInBytes) { + throw new IllegalStateException(String + .format("Invalid storage quota: %d, max allowed size: %d", quotaConfig.getStorageInBytes(), + maxAllowedSizeInBytes)); + } + } + } + } + } + + /** + * Consistency checks across the offline and realtime counterparts of a hybrid table + */ + public static void verifyHybridTableConfigs(String rawTableName, TableConfig offlineTableConfig, + TableConfig realtimeTableConfig) { + if (offlineTableConfig == null || realtimeTableConfig == null) { + return; + } + + LOGGER.info("Validating realtime and offline configs for the hybrid table: {}", rawTableName); + SegmentsValidationAndRetentionConfig offlineSegmentConfig = offlineTableConfig.getValidationConfig(); + SegmentsValidationAndRetentionConfig realtimeSegmentConfig = realtimeTableConfig.getValidationConfig(); + String offlineTimeColumnName = offlineSegmentConfig.getTimeColumnName(); + String realtimeTimeColumnName = realtimeSegmentConfig.getTimeColumnName(); + if (!Objects.equal(realtimeTimeColumnName, offlineTimeColumnName)) { Review comment: Both of the time column name should not be `null` for hybrid table ########## File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/TableConfigs.java ########## @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; + + +/** + * Wrapper for all configs of a table, which include the offline table config, realtime table config and schema. + * This helps look at and operate on the pinot table configs as a whole unit. + */ +public class TableConfigs extends BaseJsonConfig { + private final String _tableName; + private final Schema _schema; + private final TableConfig _offline; + private final TableConfig _realtime; + + @JsonCreator + public TableConfigs(@JsonProperty(value = "tableName", required = true) String tableName, + @JsonProperty(value = "schema", required = true) Schema schema, + @JsonProperty(value = "offline") @Nullable TableConfig offline, + @JsonProperty(value = "realtime") @Nullable TableConfig realtime) { Review comment: Add some precondition check here to avoid setting `null` tableName or schema ########## File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java ########## @@ -580,4 +615,120 @@ private static IndexingConfig sanitizeIndexingConfig(IndexingConfig indexingConf } return null; } + + /** + * Apply TunerConfig to the tableConfig + */ + public static void applyTunerConfig(TableConfig tableConfig, Schema schema) { + TunerConfig tunerConfig = tableConfig.getTunerConfig(); + if (tunerConfig != null && tunerConfig.getName() != null && !tunerConfig.getName().isEmpty()) { + TableConfigTuner tuner = TableConfigTunerRegistry.getTuner(tunerConfig.getName()); + tuner.init(tunerConfig, schema); + tuner.apply(tableConfig); + } + } + + /** + * Ensure that the table config has the minimum number of replicas set as per cluster configs. + * If is doesn't, set the required amount of replication in the table config + */ + public static void ensureMinReplicas(TableConfig tableConfig, int defaultTableMinReplicas) { + // For self-serviced cluster, ensure that the tables are created with at least min replication factor irrespective + // of table configuration value + SegmentsValidationAndRetentionConfig segmentsConfig = tableConfig.getValidationConfig(); + boolean verifyReplicasPerPartition; + boolean verifyReplication; + + try { + verifyReplicasPerPartition = ReplicationUtils.useReplicasPerPartition(tableConfig); + verifyReplication = ReplicationUtils.useReplication(tableConfig); + } catch (Exception e) { + throw new IllegalStateException(String.format("Invalid tableIndexConfig or streamConfig: %s", e.getMessage()), e); + } + + if (verifyReplication) { + int requestReplication; + try { + requestReplication = segmentsConfig.getReplicationNumber(); + if (requestReplication < defaultTableMinReplicas) { + LOGGER.info("Creating table with minimum replication factor of: {} instead of requested replication: {}", + defaultTableMinReplicas, requestReplication); + segmentsConfig.setReplication(String.valueOf(defaultTableMinReplicas)); + } + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid replication number", e); + } + } + + if (verifyReplicasPerPartition) { + String replicasPerPartitionStr = segmentsConfig.getReplicasPerPartition(); + if (replicasPerPartitionStr == null) { + throw new IllegalStateException("Field replicasPerPartition needs to be specified"); + } + try { + int replicasPerPartition = Integer.parseInt(replicasPerPartitionStr); + if (replicasPerPartition < defaultTableMinReplicas) { + LOGGER.info( + "Creating table with minimum replicasPerPartition of: {} instead of requested replicasPerPartition: {}", + defaultTableMinReplicas, replicasPerPartition); + segmentsConfig.setReplicasPerPartition(String.valueOf(defaultTableMinReplicas)); + } + } catch (NumberFormatException e) { + throw new IllegalStateException("Invalid value for replicasPerPartition: '" + replicasPerPartitionStr + "'", e); + } + } + } + + /** + * Ensure the table config has storage quota set as per cluster configs. + * If it doesn't, set the quota config into the table config + */ + public static void ensureStorageQuotaConstraints(TableConfig tableConfig, String maxAllowedSize) { + // Dim tables must adhere to cluster level storage size limits + if (tableConfig.isDimTable()) { + QuotaConfig quotaConfig = tableConfig.getQuotaConfig(); + long maxAllowedSizeInBytes = DataSizeUtils.toBytes(maxAllowedSize); + + if (quotaConfig == null) { + // set a default storage quota + tableConfig.setQuotaConfig(new QuotaConfig(maxAllowedSize, null)); + LOGGER.info("Assigning default storage quota ({}) for dimension table: {}", maxAllowedSize, + tableConfig.getTableName()); + } else { + if (quotaConfig.getStorage() == null) { + // set a default storage quota and keep the RPS value + tableConfig.setQuotaConfig(new QuotaConfig(maxAllowedSize, quotaConfig.getMaxQueriesPerSecond())); + LOGGER.info("Assigning default storage quota ({}) for dimension table: {}", maxAllowedSize, + tableConfig.getTableName()); + } else { + if (quotaConfig.getStorageInBytes() > maxAllowedSizeInBytes) { + throw new IllegalStateException(String + .format("Invalid storage quota: %d, max allowed size: %d", quotaConfig.getStorageInBytes(), + maxAllowedSizeInBytes)); + } + } + } + } + } + + /** + * Consistency checks across the offline and realtime counterparts of a hybrid table + */ + public static void verifyHybridTableConfigs(String rawTableName, TableConfig offlineTableConfig, + TableConfig realtimeTableConfig) { + if (offlineTableConfig == null || realtimeTableConfig == null) { + return; + } Review comment: The passed in config should not be null. You can actually add a precondition on them not null to guard the case where the table config fetch failed in `PinotTableRestletResource.checkHybridTableConfig()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org