This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ab4f333de8 Enforce schema for all tables (#15333) ab4f333de8 is described below commit ab4f333de8ddce6bd1349570472943f96dac7c7e Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Mar 25 16:15:36 2025 -0600 Enforce schema for all tables (#15333) --- .../segmentpruner/SegmentPrunerFactory.java | 2 +- .../pinot/common/config/provider/TableCache.java | 36 +-- .../pinot/common/metadata/ZKMetadataProvider.java | 51 +--- .../pinot/common/metrics/ControllerGauge.java | 15 +- .../org/apache/pinot/common/utils/SchemaUtils.java | 14 +- .../pinot/controller/BaseControllerStarter.java | 118 +++------ .../apache/pinot/controller/ControllerConf.java | 17 +- .../api/resources/PinotSchemaRestletResource.java | 43 ++- .../api/resources/PinotTableRestletResource.java | 48 +--- .../api/resources/TableAndSchemaConfig.java | 4 +- .../controller/helix/ControllerRequestClient.java | 11 + .../helix/core/PinotHelixResourceManager.java | 15 +- .../pinot/controller/helix/ControllerTest.java | 5 + .../cleanup/SchemaCleanupTaskStatelessTest.java | 288 --------------------- .../core/data/manager/BaseTableDataManager.java | 10 +- .../tests/OfflineClusterIntegrationTest.java | 5 +- ...RefreshSegmentMinionClusterIntegrationTest.java | 11 +- .../RefreshSegmentTaskGenerator.java | 4 +- ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 3 - .../local/data/manager/TableDataManager.java | 2 +- .../segment/local/utils/TableConfigUtils.java | 105 +++----- .../segment/local/utils/TableConfigUtilsTest.java | 58 +++-- .../server/predownload/PredownloadZKClient.java | 16 +- .../SegmentsValidationAndRetentionConfig.java | 15 -- 24 files changed, 206 insertions(+), 690 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java index 9efd86764e..6fc8a150c1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java @@ -133,7 +133,7 @@ public class SegmentPrunerFactory { LOGGER.warn("Cannot enable time range pruning without time column for table: {}", tableNameWithType); return null; } - Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableConfig); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType); if (schema == null) { LOGGER.warn("Cannot enable time range pruning without schema for table: {}", tableNameWithType); return null; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index a193fc19db..17d953abec 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -81,9 +81,6 @@ public class TableCache implements PinotConfigProvider { private final ZkTableConfigChangeListener _zkTableConfigChangeListener = new ZkTableConfigChangeListener(); // Key is table name with type suffix, value is table config info private final Map<String, TableConfigInfo> _tableConfigInfoMap = new ConcurrentHashMap<>(); - // Key is table name (with or without type suffix), value is schema name - // It only stores table with schema name not matching the raw table name - private final Map<String, String> _schemaNameMap = new ConcurrentHashMap<>(); // Key is lower case table name (with or without type suffix), value is actual table name // For case-insensitive mode only private final Map<String, String> _tableNameMap = new ConcurrentHashMap<>(); @@ -175,8 +172,7 @@ public class TableCache implements PinotConfigProvider { */ @Nullable public Map<String, String> getColumnNameMap(String rawTableName) { - String schemaName = _schemaNameMap.getOrDefault(rawTableName, rawTableName); - SchemaInfo schemaInfo = _schemaInfoMap.getOrDefault(schemaName, _schemaInfoMap.get(rawTableName)); + SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName); return schemaInfo != null ? schemaInfo._columnNameMap : null; } @@ -225,8 +221,7 @@ public class TableCache implements PinotConfigProvider { @Nullable @Override public Schema getSchema(String rawTableName) { - String schemaName = _schemaNameMap.getOrDefault(rawTableName, rawTableName); - SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName); + SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName); return schemaInfo != null ? schemaInfo._schema : null; } @@ -262,17 +257,8 @@ public class TableCache implements PinotConfigProvider { throws IOException { TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); String tableNameWithType = tableConfig.getTableName(); - _tableConfigInfoMap.put(tableNameWithType, new TableConfigInfo(tableConfig)); - - String schemaName = tableConfig.getValidationConfig().getSchemaName(); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - if (schemaName != null && !schemaName.equals(rawTableName)) { - _schemaNameMap.put(tableNameWithType, schemaName); - _schemaNameMap.put(rawTableName, schemaName); - } else { - removeSchemaName(tableNameWithType); - } - + _tableConfigInfoMap.put(tableNameWithType, new TableConfigInfo(tableConfig)); if (_ignoreCase) { _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType); _tableNameMap.put(rawTableName.toLowerCase(), rawTableName); @@ -287,7 +273,6 @@ public class TableCache implements PinotConfigProvider { String tableNameWithType = path.substring(TABLE_CONFIG_PATH_PREFIX.length()); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); _tableConfigInfoMap.remove(tableNameWithType); - removeSchemaName(tableNameWithType); if (_ignoreCase) { _tableNameMap.remove(tableNameWithType.toLowerCase()); String lowerCaseRawTableName = rawTableName.toLowerCase(); @@ -314,21 +299,6 @@ public class TableCache implements PinotConfigProvider { } } - private void removeSchemaName(String tableNameWithType) { - if (_schemaNameMap.remove(tableNameWithType) != null) { - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { - if (!_schemaNameMap.containsKey(TableNameBuilder.REALTIME.tableNameWithType(rawTableName))) { - _schemaNameMap.remove(rawTableName); - } - } else { - if (!_schemaNameMap.containsKey(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName))) { - _schemaNameMap.remove(rawTableName); - } - } - } - } - private void addSchemas(List<String> paths) { // Subscribe data changes before reading the data to avoid missing changes for (String path : paths) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 37ba1499e3..b95e76f19c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -44,7 +44,6 @@ import org.apache.pinot.spi.config.ConfigUtils; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -630,60 +629,16 @@ public class ZKMetadataProvider { */ @Nullable public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - Schema schema = getSchema(propertyStore, rawTableName); - if (schema != null) { - return schema; - } - - // For backward compatible where schema name is not the same as raw table name - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); - // Try to fetch realtime schema first - if (tableType == null || tableType == TableType.REALTIME) { - TableConfig realtimeTableConfig = getRealtimeTableConfig(propertyStore, tableName); - if (realtimeTableConfig != null) { - String realtimeSchemaNameFromValidationConfig = realtimeTableConfig.getValidationConfig().getSchemaName(); - if (realtimeSchemaNameFromValidationConfig != null) { - schema = getSchema(propertyStore, realtimeSchemaNameFromValidationConfig); - } - } - } - // Try to fetch offline schema if realtime schema does not exist - if (schema == null && (tableType == null || tableType == TableType.OFFLINE)) { - TableConfig offlineTableConfig = getOfflineTableConfig(propertyStore, tableName); - if (offlineTableConfig != null) { - String offlineSchemaNameFromValidationConfig = offlineTableConfig.getValidationConfig().getSchemaName(); - if (offlineSchemaNameFromValidationConfig != null) { - schema = getSchema(propertyStore, offlineSchemaNameFromValidationConfig); - } - } - } - if (schema != null && LOGGER.isDebugEnabled()) { - LOGGER.debug("Schema name does not match raw table name, schema name: {}, raw table name: {}", - schema.getSchemaName(), TableNameBuilder.extractRawTableName(tableName)); - } - return schema; + return getSchema(propertyStore, TableNameBuilder.extractRawTableName(tableName)); } /** * Get the schema associated with the given table. */ + @Deprecated @Nullable public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord> propertyStore, TableConfig tableConfig) { - String rawTableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); - Schema schema = getSchema(propertyStore, rawTableName); - if (schema != null) { - return schema; - } - String schemaNameFromTableConfig = tableConfig.getValidationConfig().getSchemaName(); - if (schemaNameFromTableConfig != null) { - schema = getSchema(propertyStore, schemaNameFromTableConfig); - } - if (schema != null && LOGGER.isDebugEnabled()) { - LOGGER.debug("Schema name does not match raw table name, schema name: {}, raw table name: {}", - schemaNameFromTableConfig, rawTableName); - } - return schema; + return getTableSchema(propertyStore, tableConfig.getTableName()); } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index fa27e60f4b..9ea6008b3f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -151,21 +151,12 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // Consumption availability lag in ms at a partition level MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false), - // Number of table schema got misconfigured - MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true), + // Number of table without table config + TABLE_WITHOUT_TABLE_CONFIG_COUNT("tableWithoutTableConfigCount", true), - // Number of table without schema + // Number of table with table config but without schema TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true), - // Number of table schema got fixed - FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true), - - // Number of tables that we want to fix but failed to copy schema from old schema name to new schema name - FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true), - - // Number of tables that we want to fix but failed to update table config - FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true), - LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE("LLCSegmentDeepStoreUploadRetryQueueSize", false), TABLE_CONSUMPTION_PAUSED("tableConsumptionPaused", false), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java index 4cadf09bea..2800441fa6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.net.URL; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.hc.client5.http.classic.methods.HttpDelete; @@ -58,7 +57,7 @@ public class SchemaUtils { /** * Fetch {@link Schema} from a {@link ZNRecord}. */ - public static Schema fromZNRecord(@Nonnull ZNRecord record) + public static Schema fromZNRecord(ZNRecord record) throws IOException { String schemaJSON = record.getSimpleField("schemaJSON"); return Schema.fromString(schemaJSON); @@ -67,7 +66,7 @@ public class SchemaUtils { /** * Wrap {@link Schema} into a {@link ZNRecord}. */ - public static ZNRecord toZNRecord(@Nonnull Schema schema) { + public static ZNRecord toZNRecord(Schema schema) { ZNRecord record = new ZNRecord(schema.getSchemaName()); record.setSimpleField("schemaJSON", schema.toSingleLineJsonString()); return record; @@ -79,7 +78,8 @@ public class SchemaUtils { * @return schema on success. * <P><code>null</code> on failure. */ - public static @Nullable Schema getSchema(@Nonnull String host, int port, @Nonnull String schemaName) { + @Nullable + public static Schema getSchema(String host, int port, String schemaName) { Preconditions.checkNotNull(host); Preconditions.checkNotNull(schemaName); @@ -112,7 +112,7 @@ public class SchemaUtils { * @return <code>true</code> on success. * <P><code>false</code> on failure. */ - public static boolean postSchema(@Nonnull String host, int port, @Nonnull Schema schema) { + public static boolean postSchema(String host, int port, Schema schema) { Preconditions.checkNotNull(host); Preconditions.checkNotNull(schema); @@ -144,7 +144,7 @@ public class SchemaUtils { * @return <code>true</code> on success. * <P><code>false</code> on failure. */ - public static boolean deleteSchema(@Nonnull String host, int port, @Nonnull String schemaName) { + public static boolean deleteSchema(String host, int port, String schemaName) { Preconditions.checkNotNull(host); Preconditions.checkNotNull(schemaName); @@ -172,7 +172,7 @@ public class SchemaUtils { * @return <code>true</code> if two schemas equal to each other. * <p><code>false</code>if two schemas do not equal to each other. */ - public static boolean equalsIgnoreVersion(@Nonnull Schema schema1, @Nonnull Schema schema2) { + public static boolean equalsIgnoreVersion(Schema schema1, Schema schema2) { Preconditions.checkNotNull(schema1); Preconditions.checkNotNull(schema2); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 94c380cbfe..2e4954076c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -37,15 +37,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.util.Timeout; -import org.apache.helix.AccessOption; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -73,7 +70,6 @@ import org.apache.pinot.common.minion.InMemoryTaskManagerStatusCache; import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo; import org.apache.pinot.common.minion.TaskManagerStatusCache; import org.apache.pinot.common.utils.PinotAppConfigs; -import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.ServiceStartableUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; @@ -141,7 +137,6 @@ import org.apache.pinot.spi.services.ServiceStartable; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.NetUtils; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.slf4j.Logger; @@ -593,9 +588,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs)); _adminApp.start(_listenerConfigs); - // One time job to fix schema name in all tables - // This method can be removed after the next major release. - fixSchemaNameInTableConfig(); + enforceTableConfigAndSchema(); _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L); _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> { @@ -626,91 +619,46 @@ public abstract class BaseControllerStarter implements ServiceStartable { } /** - * This method is used to fix table/schema names. - * TODO: in the next release, maybe 2.0.0, we can remove this method. Meanwhile we can delete the orphan schemas - * that has been existed longer than a certain time period. - * + * Scan all table resources in the cluster and ensure table config and schema exist for each table. + * TODO: Cleanup orphan table config and schema */ - @VisibleForTesting - public void fixSchemaNameInTableConfig() { - AtomicInteger misconfiguredTableCount = new AtomicInteger(); - AtomicInteger tableWithoutSchemaCount = new AtomicInteger(); - AtomicInteger fixedSchemaTableCount = new AtomicInteger(); - AtomicInteger failedToCopySchemaCount = new AtomicInteger(); - AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger(); + private void enforceTableConfigAndSchema() { ZkHelixPropertyStore<ZNRecord> propertyStore = _helixResourceManager.getPropertyStore(); - - _helixResourceManager.getAllTables().forEach(tableNameWithType -> { - Pair<TableConfig, Integer> tableConfigWithVersion = - ZKMetadataProvider.getTableConfigWithVersion(propertyStore, tableNameWithType); - if (tableConfigWithVersion == null) { - // This might due to table deletion, just log it here. - LOGGER.warn("Failed to find table config for table: {}, the table likely already got deleted", - tableNameWithType); - return; + List<String> tablesWithoutTableConfig = new ArrayList<>(); + List<String> tablesWithoutSchema = new ArrayList<>(); + for (String tableNameWithType : _helixResourceManager.getAllTables()) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType); + if (tableConfig == null) { + tablesWithoutTableConfig.add(tableNameWithType); + continue; } - TableConfig tableConfig = tableConfigWithVersion.getLeft(); - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - String schemaPath = ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName); - boolean schemaExists = propertyStore.exists(schemaPath, AccessOption.PERSISTENT); - String existSchemaName = tableConfig.getValidationConfig().getSchemaName(); - if (existSchemaName == null || existSchemaName.equals(rawTableName)) { - // Although the table config is valid, we still need to ensure the schema exists - if (!schemaExists) { - LOGGER.warn("Failed to find schema for table: {}", tableNameWithType); - tableWithoutSchemaCount.getAndIncrement(); - return; - } - // Table config is already in good status - return; + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType); + if (schema == null) { + tablesWithoutSchema.add(tableNameWithType); } - misconfiguredTableCount.getAndIncrement(); - if (schemaExists) { - // If a schema named `rawTableName` already exists, then likely this is a misconfiguration. - // Reset schema name in table config to null to let the table point to the existing schema. - LOGGER.warn("Schema: {} already exists, fix the schema name in table config from {} to null", rawTableName, - existSchemaName); + } + if (!tablesWithoutTableConfig.isEmpty()) { + LOGGER.error("[CRITICAL!!!] Failed to find table config for tables: {}", tablesWithoutTableConfig); + if (_config.isExitOnTableConfigCheckFailure()) { + throw new IllegalStateException("Failed to find table config for tables: " + tablesWithoutTableConfig + + ", exiting! Please set controller.startup.exitOnTableConfigCheckFailure=false to not exit and fix these " + + "tables."); } else { - // Copy the schema current table referring to to `rawTableName` if it does not exist - Schema schema = _helixResourceManager.getSchema(existSchemaName); - if (schema == null) { - LOGGER.warn("Failed to find schema: {} for table: {}", existSchemaName, tableNameWithType); - tableWithoutSchemaCount.getAndIncrement(); - return; - } - schema.setSchemaName(rawTableName); - if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT)) { - LOGGER.info("Copied schema: {} to {}", existSchemaName, rawTableName); - } else { - LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, rawTableName); - failedToCopySchemaCount.getAndIncrement(); - return; - } + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_TABLE_CONFIG_COUNT, + tablesWithoutTableConfig.size()); } - // Update table config to remove schema name - tableConfig.getValidationConfig().setSchemaName(null); - if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig, tableConfigWithVersion.getRight())) { - LOGGER.info("Removed schema name from table config for table: {}", tableNameWithType); - fixedSchemaTableCount.getAndIncrement(); + } + if (!tablesWithoutSchema.isEmpty()) { + LOGGER.error("[CRITICAL!!!] Failed to find schema for tables: {}", tablesWithoutSchema); + if (_config.isExitOnSchemaCheckFailure()) { + throw new IllegalStateException("Failed to find schema for tables: " + tablesWithoutSchema + + ", exiting! Please set controller.startup.exitOnSchemaCheckFailure=false to not exit and fix these " + + "tables."); } else { - LOGGER.warn("Failed to update table config for table: {}", tableNameWithType); - failedToUpdateTableConfigCount.getAndIncrement(); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT, + tablesWithoutSchema.size()); } - }); - LOGGER.info( - "Found {} tables misconfigured, {} tables without schema. Successfully fixed schema for {} tables, failed to " - + "fix {} tables due to copy schema failure, failed to fix {} tables due to update table config failure.", - misconfiguredTableCount.get(), tableWithoutSchemaCount.get(), fixedSchemaTableCount.get(), - failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get()); - - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT, - misconfiguredTableCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT, tableWithoutSchemaCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT, fixedSchemaTableCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT, - failedToCopySchemaCount.get()); - _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT, - failedToUpdateTableConfigCount.get()); + } } private ServiceStatus.ServiceStatusCallback generateServiceStatusCallback(HelixManager helixManager) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index c76943773b..29ebb084f0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -362,6 +362,11 @@ public class ControllerConf extends PinotConfiguration { public static final String ENFORCE_POOL_BASED_ASSIGNMENT_KEY = "enforce.pool.based.assignment"; public static final boolean DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT = false; + public static final String EXIT_ON_TABLE_CONFIG_CHECK_FAILURE = "controller.startup.exitOnTableConfigCheckFailure"; + public static final boolean DEFAULT_EXIT_ON_TABLE_CONFIG_CHECK_FAILURE = true; + public static final String EXIT_ON_SCHEMA_CHECK_FAILURE = "controller.startup.exitOnSchemaCheckFailure"; + public static final boolean DEFAULT_EXIT_ON_SCHEMA_CHECK_FAILURE = true; + public ControllerConf() { super(new HashMap<>()); } @@ -1205,12 +1210,20 @@ public class ControllerConf extends PinotConfiguration { return getProperty(ENFORCE_POOL_BASED_ASSIGNMENT_KEY, DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT); } + public boolean isExitOnTableConfigCheckFailure() { + return getProperty(EXIT_ON_TABLE_CONFIG_CHECK_FAILURE, DEFAULT_EXIT_ON_TABLE_CONFIG_CHECK_FAILURE); + } + + public boolean isExitOnSchemaCheckFailure() { + return getProperty(EXIT_ON_SCHEMA_CHECK_FAILURE, DEFAULT_EXIT_ON_SCHEMA_CHECK_FAILURE); + } + public void setEnableSwagger(boolean value) { - setProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE, value); + setProperty(CONSOLE_SWAGGER_ENABLE, value); } public boolean isEnableSwagger() { - String enableSwagger = getProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE); + String enableSwagger = getProperty(CONSOLE_SWAGGER_ENABLE); return enableSwagger == null || Boolean.parseBoolean(enableSwagger); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java index d0a087c1d8..2e1d0d2c88 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java @@ -206,15 +206,18 @@ public class PinotSchemaRestletResource { }) public ConfigSuccessResponse updateSchema( @ApiParam(value = "Name of the schema", required = true) @PathParam("schemaName") String schemaName, - @ApiParam(value = "Whether to reload the table if the new schema is backward compatible") @DefaultValue("false") - @QueryParam("reload") boolean reload, @Context HttpHeaders headers, FormDataMultiPart multiPart) { + @ApiParam(value = "Whether to reload the table after updating the schema") @DefaultValue("false") + @QueryParam("reload") boolean reload, + @ApiParam(value = "Whether to force update the schema even if the new schema is backward incompatible") + @DefaultValue("false") @QueryParam("force") boolean force, @Context HttpHeaders headers, + FormDataMultiPart multiPart) { schemaName = DatabaseUtils.translateTableName(schemaName, headers); Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps = getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart); Schema schema = schemaAndUnrecognizedProps.getLeft(); validateSchemaName(schema); schema.setSchemaName(DatabaseUtils.translateTableName(schema.getSchemaName(), headers)); - SuccessResponse successResponse = updateSchema(schemaName, schema, reload); + SuccessResponse successResponse = updateSchema(schemaName, schema, reload, force); return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProps.getRight()); } @@ -233,15 +236,18 @@ public class PinotSchemaRestletResource { }) public ConfigSuccessResponse updateSchema( @ApiParam(value = "Name of the schema", required = true) @PathParam("schemaName") String schemaName, - @ApiParam(value = "Whether to reload the table if the new schema is backward compatible") @DefaultValue("false") - @QueryParam("reload") boolean reload, @Context HttpHeaders headers, String schemaJsonString) { + @ApiParam(value = "Whether to reload the table after updating the schema") @DefaultValue("false") + @QueryParam("reload") boolean reload, + @ApiParam(value = "Whether to force update the schema even if the new schema is backward incompatible") + @DefaultValue("false") @QueryParam("force") boolean force, @Context HttpHeaders headers, + String schemaJsonString) { schemaName = DatabaseUtils.translateTableName(schemaName, headers); Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps = getSchemaAndUnrecognizedPropertiesFromJson(schemaJsonString); Schema schema = schemaAndUnrecognizedProps.getLeft(); validateSchemaName(schema); schema.setSchemaName(DatabaseUtils.translateTableName(schema.getSchemaName(), headers)); - SuccessResponse successResponse = updateSchema(schemaName, schema, reload); + SuccessResponse successResponse = updateSchema(schemaName, schema, reload, force); return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProps.getRight()); } @@ -446,7 +452,7 @@ public class PinotSchemaRestletResource { * @param reload set to true to reload the tables using the schema, so committed segments can pick up the new schema * @return SuccessResponse */ - private SuccessResponse updateSchema(String schemaName, Schema schema, boolean reload) { + private SuccessResponse updateSchema(String schemaName, Schema schema, boolean reload, boolean force) { validateSchemaInternal(schema); if (!schemaName.equals(schema.getSchemaName())) { @@ -457,7 +463,7 @@ public class PinotSchemaRestletResource { } try { - _pinotHelixResourceManager.updateSchema(schema, reload, false); + _pinotHelixResourceManager.updateSchema(schema, reload, force); // Best effort notification. If controller fails at this point, no notification is given. LOGGER.info("Notifying metadata event for updating schema: {}", schemaName); _metadataEventNotifierFactory.create().notifyOnSchemaEvents(schema, SchemaEventType.UPDATE); @@ -520,7 +526,6 @@ public class PinotSchemaRestletResource { */ private Pair<Schema, Map<String, Object>> getSchemaAndUnrecognizedPropertiesFromJson(String schemaJsonString) throws ControllerApplicationException { - Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps; try { return JsonUtils.stringToObjectAndUnrecognizedProperties(schemaJsonString, Schema.class); } catch (Exception e) { @@ -537,24 +542,14 @@ public class PinotSchemaRestletResource { } // If the schema is associated with a table, we should not delete it. - // TODO: Check OFFLINE tables as well. There are 2 side effects: - // - Increases ZK read when there are lots of OFFLINE tables - // - Behavior change since we don't allow deleting schema for OFFLINE tables - List<String> realtimeTables = _pinotHelixResourceManager.getAllRealtimeTables(); - for (String realtimeTableName : realtimeTables) { - if (schemaName.equals(TableNameBuilder.extractRawTableName(realtimeTableName))) { + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(schemaName); + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(schemaName); + for (String tableNameWithType : new String[]{offlineTableName, realtimeTableName}) { + if (_pinotHelixResourceManager.hasTable(tableNameWithType)) { throw new ControllerApplicationException(LOGGER, - String.format("Cannot delete schema %s, as it is associated with table %s", schemaName, realtimeTableName), + String.format("Cannot delete schema %s, as it is associated with table %s", schemaName, tableNameWithType), Response.Status.CONFLICT); } - TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(realtimeTableName); - if (tableConfig != null) { - if (schemaName.equals(tableConfig.getValidationConfig().getSchemaName())) { - throw new ControllerApplicationException(LOGGER, - String.format("Cannot delete schema %s, as it is associated with table %s", schemaName, - realtimeTableName), Response.Status.CONFLICT); - } - } } LOGGER.info("Trying to delete schema {}", schemaName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index fed660e988..eb604b55b5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -112,7 +112,6 @@ import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.segment.local.utils.TableConfigUtils; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableStatsHumanReadable; import org.apache.pinot.spi.config.table.TableStatus; @@ -194,9 +193,8 @@ public class PinotTableRestletResource { HttpClientConnectionManager _connectionManager; /** - * API to create a table. Before adding, validations will be done (min number of replicas, - * checking offline and realtime table configs match, checking for tenants existing) - * @param tableConfigStr + * API to create a table. Before adding, validations will be done (min number of replicas, checking offline and + * realtime table configs match, checking for tenants existing). */ @POST @Produces(MediaType.APPLICATION_JSON) @@ -218,14 +216,13 @@ public class PinotTableRestletResource { tableConfig = tableConfigAndUnrecognizedProperties.getLeft(); tableNameWithType = DatabaseUtils.translateTableName(tableConfig.getTableName(), httpHeaders); tableConfig.setTableName(tableNameWithType); - // Handle legacy config - handleLegacySchemaConfig(tableConfig, httpHeaders); // validate permission ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, httpHeaders, AccessType.CREATE, Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER); - schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType); + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableNameWithType); TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager, tableConfig, schema, Collections.emptyMap()); @@ -490,8 +487,6 @@ public class PinotTableRestletResource { tableConfig = tableConfigAndUnrecognizedProperties.getLeft(); tableNameWithType = DatabaseUtils.translateTableName(tableConfig.getTableName(), headers); tableConfig.setTableName(tableNameWithType); - // Handle legacy config - handleLegacySchemaConfig(tableConfig, headers); String tableNameFromPath = DatabaseUtils.translateTableName( TableNameBuilder.forType(tableConfig.getTableType()).tableNameWithType(tableName), headers); if (!tableNameFromPath.equals(tableNameWithType)) { @@ -500,7 +495,8 @@ public class PinotTableRestletResource { Response.Status.BAD_REQUEST); } - schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType); + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableNameWithType); TableConfigUtils.validate(tableConfig, schema, typesToSkip); } catch (Exception e) { String msg = String.format("Invalid table config: %s with error: %s", tableName, e.getMessage()); @@ -559,24 +555,22 @@ public class PinotTableRestletResource { String tableNameWithType = DatabaseUtils.translateTableName(tableConfig.getTableName(), httpHeaders); tableConfig.setTableName(tableNameWithType); - // Handle legacy config - handleLegacySchemaConfig(tableConfig, httpHeaders); - // validate permission ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, httpHeaders, AccessType.READ, Actions.Table.VALIDATE_TABLE_CONFIGS, _accessControlFactory, LOGGER); - ObjectNode validationResponse = - validateConfig(tableConfig, _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig), typesToSkip); + ObjectNode validationResponse = validateConfig(tableConfig, typesToSkip); validationResponse.set("unrecognizedProperties", JsonUtils.objectToJsonNode(tableConfigAndUnrecognizedProperties.getRight())); return validationResponse; } - private ObjectNode validateConfig(TableConfig tableConfig, Schema schema, @Nullable String typesToSkip) { + private ObjectNode validateConfig(TableConfig tableConfig, @Nullable String typesToSkip) { + String tableNameWithType = tableConfig.getTableName(); try { + Schema schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType); if (schema == null) { - throw new SchemaNotFoundException("Got empty schema"); + throw new SchemaNotFoundException("Failed to find schema for table: " + tableNameWithType); } TableConfigUtils.validate(tableConfig, schema, typesToSkip); TaskConfigUtils.validateTaskConfigs(tableConfig, schema, _pinotTaskManager, typesToSkip); @@ -588,7 +582,7 @@ public class PinotTableRestletResource { } return tableConfigValidateStr; } catch (Exception e) { - String msg = String.format("Invalid table config: %s. %s", tableConfig.getTableName(), e.getMessage()); + String msg = String.format("Invalid table config: %s. %s", tableNameWithType, e.getMessage()); throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST, e); } } @@ -1279,24 +1273,6 @@ public class PinotTableRestletResource { return timeBoundaryMs; } - /** - * Handles the legacy schema configuration for a given table configuration. - * This method updates the schema name in the validation configuration of the table config - * to ensure it is correctly translated based on the provided HTTP headers. - * This is necessary to maintain compatibility with older configurations that may not - * have the schema name properly set or formatted. - * - * @param tableConfig The {@link TableConfig} object containing the table configuration. - * @param httpHeaders The {@link HttpHeaders} object containing the HTTP headers, used to - * translate the schema name if necessary. - */ - private void handleLegacySchemaConfig(TableConfig tableConfig, HttpHeaders httpHeaders) { - SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); - if (validationConfig.getSchemaName() != null) { - validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(), httpHeaders)); - } - } - /** * Try to calculate the instance partitions for the given table config. Throws exception if it fails. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java index a4a76d1527..a2f262b6fb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java @@ -21,7 +21,6 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; -import javax.annotation.Nullable; import org.apache.pinot.spi.config.TableConfigs; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -39,7 +38,7 @@ public class TableAndSchemaConfig { @JsonCreator public TableAndSchemaConfig(@JsonProperty(value = "tableConfig", required = true) TableConfig tableConfig, - @JsonProperty("schema") @Nullable Schema schema) { + @JsonProperty(value = "schema", required = true) Schema schema) { _tableConfig = tableConfig; _schema = schema; } @@ -48,7 +47,6 @@ public class TableAndSchemaConfig { return _tableConfig; } - @Nullable public Schema getSchema() { return _schema; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index 754367fef5..4a8c54993e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -105,6 +105,17 @@ public class ControllerRequestClient { } } + public void forceUpdateSchema(Schema schema) + throws IOException { + String url = _controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName()) + "?force=true"; + try { + HttpClient.wrapAndThrowHttpException( + _httpClient.sendMultipartPutRequest(url, schema.toSingleLineJsonString(), _headers)); + } catch (HttpErrorStatusException e) { + throw new IOException(e); + } + } + public void deleteSchema(String schemaName) throws IOException { String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f125854976..c8478c25c5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1630,21 +1630,10 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.getTableSchema(_propertyStore, tableName); } - /** - * Find schema with same name as rawTableName. If not found, find schema using schemaName in validationConfig. - * For OFFLINE table, it is possible that schema was not uploaded before creating the table. Hence for OFFLINE, - * this method can return null. - */ + @Deprecated @Nullable public Schema getSchemaForTableConfig(TableConfig tableConfig) { - Schema schema = getSchema(TableNameBuilder.extractRawTableName(tableConfig.getTableName())); - if (schema == null) { - String schemaName = tableConfig.getValidationConfig().getSchemaName(); - if (schemaName != null) { - schema = getSchema(schemaName); - } - } - return schema; + return ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); } public List<String> getSchemaNames() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 7a00a4d8e4..293f649fa9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -664,6 +664,11 @@ public class ControllerTest { getControllerRequestClient().updateSchema(schema); } + public void forceUpdateSchema(Schema schema) + throws IOException { + getControllerRequestClient().forceUpdateSchema(schema); + } + public Schema getSchema(String schemaName) { Schema schema = _helixResourceManager.getSchema(schemaName); assertNotNull(schema); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java deleted file mode 100644 index 19c4756634..0000000000 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core.cleanup; - -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.pinot.common.metrics.ControllerGauge; -import org.apache.pinot.common.metrics.MetricValueUtils; -import org.apache.pinot.common.utils.config.TagNameUtils; -import org.apache.pinot.controller.BaseControllerStarter; -import org.apache.pinot.controller.helix.ControllerTest; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.metrics.PinotMetricUtils; -import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.NetUtils; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; - - -/** - * This test can be deleted once {@link BaseControllerStarter#fixSchemaNameInTableConfig()} is deleted. Likely in 2.0.0. - */ -@Test(groups = "stateless") -public class SchemaCleanupTaskStatelessTest extends ControllerTest { - @BeforeClass - public void setup() - throws Exception { - startZk(); - startController(); - startFakeBroker(); - startFakeServer(); - } - - private void startFakeBroker() - throws Exception { - String brokerInstance = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + NetUtils.getHostAddress() + "_" - + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT; - - // Create server instance with the fake server state model - HelixManager brokerHelixManager = - HelixManagerFactory.getZKHelixManager(getHelixClusterName(), brokerInstance, InstanceType.PARTICIPANT, - getZkUrl()); - brokerHelixManager.connect(); - - // Add Helix tag to the server - brokerHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(), brokerInstance, - TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)); - } - - private void startFakeServer() - throws Exception { - String serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + NetUtils.getHostAddress() + "_" - + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT; - - // Create server instance with the fake server state model - HelixManager serverHelixManager = HelixManagerFactory - .getZKHelixManager(getHelixClusterName(), serverInstance, InstanceType.PARTICIPANT, getZkUrl()); - serverHelixManager.connect(); - - // Add Helix tag to the server - serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(), serverInstance, - TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME)); - } - - @AfterClass - public void teardown() { - stopController(); - stopZk(); - } - - @Test - public void testSchemaCleanupTask() - throws Exception { - PinotMetricUtils.cleanUp(); - PinotMetricUtils.getPinotMetricsRegistry(); - // 1. Add a schema - addSchema(createDummySchema("t1")); - addSchema(createDummySchema("t2")); - addSchema(createDummySchema("t3")); - - // 2. Add a table with the schema name reference - addTableConfig(createDummyTableConfig("t1", "t1")); - addTableConfig(createDummyTableConfig("t2", "t2")); - addTableConfig(createDummyTableConfig("t3", "t3")); - - _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", "t2")); - _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", "t3")); - _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", "t1")); - - // 3. Fix table schema - _controllerStarter.fixSchemaNameInTableConfig(); - - // 4. validate - assertEquals(getHelixResourceManager().getAllTables().size(), 3); - assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); - - assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName()); - assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName()); - assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); - - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0); - - // 5. Clean up - for (String table : getHelixResourceManager().getAllOfflineTables()) { - getHelixResourceManager().deleteOfflineTable(table); - } - for (String schema : getHelixResourceManager().getSchemaNames()) { - getHelixResourceManager().deleteSchema(schema); - } - } - - @Test - public void testSchemaCleanupTaskNormalCase() - throws Exception { - PinotMetricUtils.cleanUp(); - PinotMetricUtils.getPinotMetricsRegistry(); - // 1. Add a schema - addSchema(createDummySchema("t1")); - addSchema(createDummySchema("t2")); - addSchema(createDummySchema("t3")); - - assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); - - // 2. Add a table with the schema name reference - addTableConfig(createDummyTableConfig("t1", "t1")); - addTableConfig(createDummyTableConfig("t2", "t2")); - addTableConfig(createDummyTableConfig("t3", "t3")); - - assertEquals(getHelixResourceManager().getAllTables().size(), 3); - - // 3. Create new schemas and update table to new schema - addSchema(createDummySchema("t11")); - addSchema(createDummySchema("t21")); - addSchema(createDummySchema("t31")); - _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", "t11")); - _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", "t21")); - _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", "t31")); - - assertEquals(getHelixResourceManager().getAllTables().size(), 3); - assertEquals(getHelixResourceManager().getSchemaNames().size(), 6); - assertEquals(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName(), "t11"); - assertEquals(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName(), "t21"); - assertEquals(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName(), "t31"); - - // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed later. - deleteSchema("t1"); - deleteSchema("t2"); - deleteSchema("t3"); - - assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); - - // 5. Fix table schema - _controllerStarter.fixSchemaNameInTableConfig(); - - // 6. All tables will directly set schema. - assertEquals(getHelixResourceManager().getAllTables().size(), 3); - assertEquals(getHelixResourceManager().getSchemaNames().size(), 6); - assertTrue(getHelixResourceManager().getSchemaNames().contains("t1")); - assertTrue(getHelixResourceManager().getSchemaNames().contains("t2")); - assertTrue(getHelixResourceManager().getSchemaNames().contains("t3")); - - assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName()); - assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName()); - assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); - - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0); - - // 7. Clean up - for (String table : getHelixResourceManager().getAllOfflineTables()) { - getHelixResourceManager().deleteOfflineTable(table); - } - for (String schema : getHelixResourceManager().getSchemaNames()) { - getHelixResourceManager().deleteSchema(schema); - } - } - - @Test - public void testMissingSchema() - throws Exception { - PinotMetricUtils.cleanUp(); - PinotMetricUtils.getPinotMetricsRegistry(); - // 1. Add a schema - addSchema(createDummySchema("t1")); - addSchema(createDummySchema("t2")); - addSchema(createDummySchema("t3")); - - assertEquals(getHelixResourceManager().getSchemaNames().size(), 3); - - // 2. Add a table with the schema name reference - addTableConfig(createDummyTableConfig("t1")); - addTableConfig(createDummyTableConfig("t2")); - addTableConfig(createDummyTableConfig("t3")); - - assertEquals(getHelixResourceManager().getAllTables().size(), 3); - - // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed later. - deleteSchema("t1"); - deleteSchema("t2"); - deleteSchema("t3"); - - assertEquals(getHelixResourceManager().getSchemaNames().size(), 0); - - // 5. Fix table schema - _controllerStarter.fixSchemaNameInTableConfig(); - - // 6. We cannot fix schema - assertEquals(getHelixResourceManager().getAllTables().size(), 3); - assertEquals(getHelixResourceManager().getSchemaNames().size(), 0); - - assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName()); - assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName()); - assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName()); - - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), - ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0); - - // 7. Clean up - for (String table : getHelixResourceManager().getAllOfflineTables()) { - getHelixResourceManager().deleteOfflineTable(table); - } - for (String schema : getHelixResourceManager().getSchemaNames()) { - getHelixResourceManager().deleteSchema(schema); - } - } - - private TableConfig createDummyTableConfig(String table) { - return new TableConfigBuilder(TableType.OFFLINE) - .setTableName(table) - .build(); - } - - private TableConfig createDummyTableConfig(String table, String schema) { - TableConfig tableConfig = createDummyTableConfig(table); - tableConfig.getValidationConfig().setSchemaName(schema); - return tableConfig; - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index fa985f2b06..13edaf56cf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -96,7 +96,6 @@ import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -358,16 +357,13 @@ public abstract class BaseTableDataManager implements TableDataManager { public Pair<TableConfig, Schema> fetchTableConfigAndSchema() { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); - // NOTE: Schema is mandatory for REALTIME table. - if (tableConfig.getTableType() == TableType.REALTIME) { - Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - } + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); return Pair.of(tableConfig, schema); } @Override - public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, @Nullable Schema schema) { + public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema schema) { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); indexLoadingConfig.setTableDataDir(_tableDataDir); return indexLoadingConfig; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 70ecd19d5c..32de491a30 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -2078,14 +2078,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet tableConfig.getIndexingConfig().getNoDictionaryColumns().remove("NewAddedRawDerivedMVIntDimension"); updateTableConfig(tableConfig); - // Need to first delete then add the schema because removing columns is backward-incompatible change - deleteSchema(getTableName()); + // Need to force update the schema because removing columns is backward-incompatible change Schema schema = createSchema(); schema.removeField("AirlineID"); schema.removeField("ArrTime"); schema.removeField("AirTime"); schema.removeField("ArrDel15"); - addSchema(schema); + forceUpdateSchema(schema); // Trigger reload reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java index 9a0b23bae4..8f76ce7f6e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java @@ -55,7 +55,10 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterIntegrationTest { @@ -151,13 +154,12 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); // Change datatype from INT -> LONG for airlineId - deleteSchema(getTableName()); Schema schema = createSchema(); schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.LONG); schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.STRING); schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.FLOAT); schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING); - addSchema(schema); + forceUpdateSchema(schema); assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext() .setTablesToSchedule(Collections.singleton(offlineTableName))) @@ -209,13 +211,12 @@ public class RefreshSegmentMinionClusterIntegrationTest extends BaseClusterInteg }); // Reset the schema back to it's original state. - deleteSchema(getTableName()); schema = createSchema(); schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.INT); schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.LONG); schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.INT); schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.INT); - addSchema(schema); + forceUpdateSchema(schema); } @Test(priority = 3) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java index 2bc366a371..bcc9a0883d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java @@ -81,7 +81,6 @@ public class RefreshSegmentTaskGenerator extends BaseTaskGenerator { String tableNameWithType = tableConfig.getTableName(); Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType); - String taskType = RefreshSegmentTask.TASK_TYPE; List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); @@ -102,7 +101,8 @@ public class RefreshSegmentTaskGenerator extends BaseTaskGenerator { // Get info about table and schema. Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType); - Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + Schema schema = pinotHelixResourceManager.getTableSchema(tableNameWithType); + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableNameWithType); Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName()); // Get the running segments for a table. diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java index 754f7224a2..a4874a1cc9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java @@ -43,7 +43,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -525,8 +524,6 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest { .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); - when(mockPinotHelixResourceManager.getSchemaForTableConfig(Mockito.any())).thenReturn(schema); - RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new RealtimeToOfflineSegmentsTaskGenerator(); taskGenerator.init(mockClusterInfoAccessor); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index bef2d7fa84..7f60004fdb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -308,7 +308,7 @@ public interface TableDataManager { /** * Constructs the index loading config for the table with the given table config and schema. */ - IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, @Nullable Schema schema); + IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema schema); /** * Interface to handle segment state transitions from CONSUMING to DROPPED diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 5bf6ef8bc8..d28225c923 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -90,7 +89,6 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.DataSizeUtils; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.TimeUtils; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +135,7 @@ public final class TableConfigUtils { /** * @see TableConfigUtils#validate(TableConfig, Schema, String) */ - public static void validate(TableConfig tableConfig, @Nullable Schema schema) { + public static void validate(TableConfig tableConfig, Schema schema) { validate(tableConfig, schema, null); } @@ -152,17 +150,14 @@ public final class TableConfigUtils { * * TODO: Add more validations for each section (e.g. validate conditions are met for aggregateMetrics) */ - public static void validate(TableConfig tableConfig, @Nullable Schema schema, @Nullable String typesToSkip) { + public static void validate(TableConfig tableConfig, Schema schema, @Nullable String typesToSkip) { + Preconditions.checkArgument(schema != null, "Schema should not be null"); Set<ValidationType> skipTypes = parseTypesToSkipString(typesToSkip); - if (tableConfig.getTableType() == TableType.REALTIME) { - Preconditions.checkState(schema != null, "Schema should not be null for REALTIME table"); - } // Sanitize the table config before validation sanitize(tableConfig); // skip all validation if skip type ALL is selected. if (!skipTypes.contains(ValidationType.ALL)) { - validateTableSchemaConfig(tableConfig); validateValidationConfig(tableConfig, schema); validateIngestionConfig(tableConfig, schema); @@ -205,7 +200,7 @@ public final class TableConfigUtils { * @param tableConfig Table config to validate * @return true if the table config is using instance pool and replica group configuration, false otherwise */ - static boolean isTableUsingInstancePoolAndReplicaGroup(@Nonnull TableConfig tableConfig) { + static boolean isTableUsingInstancePoolAndReplicaGroup(TableConfig tableConfig) { boolean status = true; Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); if (instanceAssignmentConfigMap != null) { @@ -253,19 +248,6 @@ public final class TableConfigUtils { } } - /** - * Validates the table name with the following rule: - * - Schema name should either be null or match the raw table name - */ - private static void validateTableSchemaConfig(TableConfig tableConfig) { - // Ensure that table is not created if schema is not present - String rawTableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); - String schemaName = tableConfig.getValidationConfig().getSchemaName(); - if (schemaName != null && !schemaName.equals(rawTableName)) { - throw new IllegalStateException("Schema name: " + schemaName + " does not match table name: " + rawTableName); - } - } - /** * Validates retention config. Checks for following things: * - Valid segmentPushType @@ -314,7 +296,7 @@ public final class TableConfigUtils { * 3. Checks peerDownloadSchema * 4. Checks time column existence if null handling for time column is enabled */ - private static void validateValidationConfig(TableConfig tableConfig, @Nullable Schema schema) { + private static void validateValidationConfig(TableConfig tableConfig, Schema schema) { SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig(); String timeColumnName = validationConfig.getTimeColumnName(); if (tableConfig.getTableType() == TableType.REALTIME) { @@ -322,7 +304,7 @@ public final class TableConfigUtils { Preconditions.checkState(timeColumnName != null, "'timeColumnName' cannot be null in REALTIME table config"); } // timeColumnName can be null in OFFLINE table - if (timeColumnName != null && !timeColumnName.isEmpty() && schema != null) { + if (timeColumnName != null) { Preconditions.checkState(schema.getSpecForTimeColumn(timeColumnName) != null, "Cannot find valid fieldSpec for timeColumn: %s from the table config: %s, in the schema: %s", timeColumnName, tableConfig.getTableName(), schema.getSchemaName()); @@ -330,8 +312,8 @@ public final class TableConfigUtils { if (tableConfig.isDimTable()) { Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE, "Dimension table must be of OFFLINE table type."); - Preconditions.checkState(schema != null, "Dimension table must have an associated schema"); - Preconditions.checkState(!schema.getPrimaryKeyColumns().isEmpty(), "Dimension table must have primary key[s]"); + Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()), + "Dimension table must have primary key[s]"); } String peerSegmentDownloadScheme = validationConfig.getPeerSegmentDownloadScheme(); @@ -360,7 +342,7 @@ public final class TableConfigUtils { * 6. ingestion type for dimension tables */ @VisibleForTesting - public static void validateIngestionConfig(TableConfig tableConfig, @Nullable Schema schema) { + public static void validateIngestionConfig(TableConfig tableConfig, Schema schema) { IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); if (ingestionConfig != null) { @@ -395,7 +377,7 @@ public final class TableConfigUtils { Preconditions.checkState(indexingConfig == null || MapUtils.isEmpty(indexingConfig.getStreamConfigs()), "Should not use indexingConfig#getStreamConfigs if ingestionConfig#StreamIngestionConfig is provided"); List<Map<String, String>> streamConfigMaps = ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps(); - Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at least 1 stream in REALTIME table"); + Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at least 1 stream in REALTIME table"); // TODO: for multiple stream configs, validate them } @@ -431,14 +413,11 @@ public final class TableConfigUtils { "columnName/aggregationFunction cannot be null in AggregationConfig " + aggregationConfig); } - FieldSpec fieldSpec = null; - if (schema != null) { - fieldSpec = schema.getFieldSpecFor(columnName); - Preconditions.checkState(fieldSpec != null, "The destination column '" + columnName - + "' of the aggregation function must be present in the schema"); - Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC, - "The destination column '" + columnName + "' of the aggregation function must be a metric column"); - } + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + Preconditions.checkState(fieldSpec != null, + "The destination column '" + columnName + "' of the aggregation function must be present in the schema"); + Preconditions.checkState(fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC, + "The destination column '" + columnName + "' of the aggregation function must be a metric column"); if (!aggregationColumns.add(columnName)) { throw new IllegalStateException("Duplicate aggregation config found for column '" + columnName + "'"); @@ -471,11 +450,9 @@ public final class TableConfigUtils { Preconditions.checkState(StringUtils.isNumeric(literal), "Second argument of DISTINCT_COUNT_HLL must be a number: %s", aggregationConfig); } - if (fieldSpec != null) { - DataType dataType = fieldSpec.getDataType(); - Preconditions.checkState(dataType == DataType.BYTES, - "Result type for DISTINCT_COUNT_HLL must be BYTES: %s", aggregationConfig); - } + DataType dataType = fieldSpec.getDataType(); + Preconditions.checkState(dataType == DataType.BYTES, "Result type for DISTINCT_COUNT_HLL must be BYTES: %s", + aggregationConfig); } else if (functionType == DISTINCTCOUNTHLLPLUS) { Preconditions.checkState(numArguments >= 1 && numArguments <= 3, "DISTINCT_COUNT_HLL_PLUS can have at most three arguments: %s", aggregationConfig); @@ -525,10 +502,8 @@ public final class TableConfigUtils { aggregationSourceColumns.add(firstArgument.getIdentifier()); } - if (schema != null) { - Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns), - "all metric columns must be aggregated"); - } + Preconditions.checkState(new HashSet<>(schema.getMetricNames()).equals(aggregationColumns), + "all metric columns must be aggregated"); // This is required by MutableSegmentImpl.enableMetricsAggregationIfPossible(). // That code will disable ingestion aggregation if all metrics aren't noDictionaryColumns. @@ -565,13 +540,10 @@ public final class TableConfigUtils { if (!transformColumns.add(columnName)) { throw new IllegalStateException("Duplicate transform config found for column '" + columnName + "'"); } - if (schema != null) { - Preconditions.checkState( - schema.getFieldSpecFor(columnName) != null || aggregationSourceColumns.contains(columnName), - "The destination column '" + columnName - + "' of the transform function must be present in the schema or as a source column for " - + "aggregations"); - } + Preconditions.checkState(schema.hasColumn(columnName) || aggregationSourceColumns.contains(columnName), + "The destination column '" + columnName + + "' of the transform function must be present in the schema or as a source column for " + + "aggregations"); FunctionEvaluator expressionEvaluator; if (_disableGroovy && FunctionEvaluatorFactory.isGroovyExpression(transformFunction)) { throw new IllegalStateException( @@ -595,7 +567,7 @@ public final class TableConfigUtils { // Complex configs ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig(); - if (complexTypeConfig != null && schema != null) { + if (complexTypeConfig != null) { Map<String, String> prefixesToRename = complexTypeConfig.getPrefixesToRename(); if (MapUtils.isNotEmpty(prefixesToRename)) { Set<String> fieldNames = schema.getColumnNames(); @@ -611,7 +583,7 @@ public final class TableConfigUtils { SchemaConformingTransformerConfig schemaConformingTransformerConfig = ingestionConfig.getSchemaConformingTransformerConfig(); - if (null != schemaConformingTransformerConfig && null != schema) { + if (schemaConformingTransformerConfig != null) { SchemaConformingTransformer.validateSchema(schema, schemaConformingTransformerConfig); } } @@ -983,10 +955,7 @@ public final class TableConfigUtils { * Also ensures proper dependency between index types (eg: Inverted Index columns * cannot be present in no-dictionary columns). */ - private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nullable Schema schema) { - if (schema == null) { - return; - } + private static void validateIndexingConfig(IndexingConfig indexingConfig, Schema schema) { ArrayListMultimap<String, String> columnNameToConfigMap = ArrayListMultimap.create(); Set<String> noDictionaryColumnsSet = new HashSet<>(); @@ -1189,7 +1158,7 @@ public final class TableConfigUtils { * Additional checks for TEXT and FST index types * Validates index compatibility for forward index disabled columns */ - private static void validateFieldConfigList(TableConfig tableConfig, @Nullable Schema schema) { + private static void validateFieldConfigList(TableConfig tableConfig, Schema schema) { List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); TableType tableType = tableConfig.getTableType(); @@ -1200,6 +1169,9 @@ public final class TableConfigUtils { for (FieldConfig fieldConfig : fieldConfigList) { String columnName = fieldConfig.getName(); + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + Preconditions.checkState(fieldSpec != null, + "Column: %s defined in field config list must be a valid column defined in the schema", columnName); EncodingType encodingType = fieldConfig.getEncodingType(); Preconditions.checkArgument(encodingType != null, "Encoding type must be specified for column: %s", columnName); CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); @@ -1211,10 +1183,8 @@ public final class TableConfigUtils { "Compression codec: %s is not applicable to raw index", compressionCodec); if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2 - || compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4) - && schema != null) { - Preconditions.checkArgument( - schema.getFieldSpecFor(columnName).getDataType().getStoredType() == DataType.STRING, + || compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4)) { + Preconditions.checkArgument(fieldSpec.getDataType().getStoredType() == DataType.STRING, "CLP compression codec can only be applied to string columns"); } break; @@ -1232,13 +1202,6 @@ public final class TableConfigUtils { break; } - if (schema == null) { - return; - } - FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); - Preconditions.checkState(fieldSpec != null, - "Column: %s defined in field config list must be a valid column defined in the schema", columnName); - // Validate the forward index disabled compatibility with other indexes if enabled for this column validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfig, schema, tableType); @@ -1493,7 +1456,7 @@ public final class TableConfigUtils { return false; } - private static boolean isRoutingStrategyAllowedForUpsert(@Nonnull RoutingConfig routingConfig) { + private static boolean isRoutingStrategyAllowedForUpsert(RoutingConfig routingConfig) { String instanceSelectorType = routingConfig.getInstanceSelectorType(); return UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES.stream().anyMatch(x -> x.equalsIgnoreCase(instanceSelectorType)); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index b5ca43e4ba..b3bf9d3e1a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -91,7 +91,7 @@ public class TableConfigUtilsTest { try { TableConfigUtils.validate(tableConfig, null); Assert.fail("Should fail for null timeColumnName and null schema in REALTIME table"); - } catch (IllegalStateException e) { + } catch (IllegalArgumentException e) { // expected } @@ -101,7 +101,7 @@ public class TableConfigUtilsTest { try { TableConfigUtils.validate(tableConfig, null); Assert.fail("Should fail for null schema in REALTIME table"); - } catch (IllegalStateException e) { + } catch (IllegalArgumentException e) { // expected } @@ -149,12 +149,22 @@ public class TableConfigUtilsTest { // OFFLINE table // null timeColumnName and schema - allowed in OFFLINE tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); - TableConfigUtils.validate(tableConfig, null); + try { + TableConfigUtils.validate(tableConfig, null); + Assert.fail("Should fail for null timeColumnName and null schema in OFFLINE table"); + } catch (IllegalArgumentException e) { + // expected + } // null schema only - allowed in OFFLINE tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build(); - TableConfigUtils.validate(tableConfig, null); + try { + TableConfigUtils.validate(tableConfig, null); + Assert.fail("Should fail for null schema in OFFLINE table"); + } catch (IllegalArgumentException e) { + // expected + } // null timeColumnName only - allowed in OFFLINE schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); @@ -172,7 +182,7 @@ public class TableConfigUtilsTest { // expected } - // non-null schema nd timeColumnName, but timeColumnName not present as a time spec in schema + // non-null schema and timeColumnName, but timeColumnName not present as a time spec in schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) .addSingleValueDimension(TIME_COLUMN, FieldSpec.DataType.STRING).build(); tableConfig = @@ -184,11 +194,6 @@ public class TableConfigUtilsTest { // expected } - // empty timeColumnName - valid - schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); - tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build(); - TableConfigUtils.validate(tableConfig, schema); - // valid schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); @@ -217,7 +222,7 @@ public class TableConfigUtilsTest { try { TableConfigUtils.validate(tableConfig, null); Assert.fail("Should fail with a Dimension table without a schema"); - } catch (IllegalStateException e) { + } catch (IllegalArgumentException e) { // expected } @@ -327,7 +332,7 @@ public class TableConfigUtilsTest { // invalid transform config since Groovy is disabled try { TableConfigUtils.setDisableGroovy(true); - TableConfigUtils.validate(tableConfig, schema, null); + TableConfigUtils.validate(tableConfig, schema); // Reset to false TableConfigUtils.setDisableGroovy(false); Assert.fail("Should fail when Groovy functions disabled but found in transform config"); @@ -363,7 +368,7 @@ public class TableConfigUtilsTest { ingestionConfig.setFilterConfig(new FilterConfig("Groovy({timestamp > 0}, timestamp)")); try { TableConfigUtils.setDisableGroovy(true); - TableConfigUtils.validate(tableConfig, schema, null); + TableConfigUtils.validate(tableConfig, schema); // Reset to false TableConfigUtils.setDisableGroovy(false); Assert.fail("Should fail when Groovy functions disabled but found in filter config"); @@ -677,23 +682,27 @@ public class TableConfigUtilsTest { @Test public void ingestionStreamConfigsTest() { + Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .build(); Map<String, String> streamConfigs = getStreamConfigs(); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs))); - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig) + .build(); // Multiple stream configs are allowed try { - TableConfigUtils.validateIngestionConfig(tableConfig, null); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); } catch (IllegalStateException e) { Assert.fail("Multiple stream configs should be supported"); } // stream config should be valid ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(streamConfigs))); - TableConfigUtils.validateIngestionConfig(tableConfig, null); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); // validate the proto decoder streamConfigs = getKafkaStreamConfigs(); @@ -759,6 +768,8 @@ public class TableConfigUtilsTest { @Test public void ingestionBatchConfigsTest() { + Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build(); + Map<String, String> batchConfigMap = new HashMap<>(); batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo"); batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar"); @@ -773,11 +784,14 @@ public class TableConfigUtilsTest { new BatchIngestionConfig(Arrays.asList(batchConfigMap, batchConfigMap), null, null)); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build(); - TableConfigUtils.validateIngestionConfig(tableConfig, null); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); } @Test public void ingestionConfigForDimensionTableTest() { + Schema schema = + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(List.of("pk")).build(); + Map<String, String> batchConfigMap = new HashMap<>(); batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo"); batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar"); @@ -792,12 +806,12 @@ public class TableConfigUtilsTest { new BatchIngestionConfig(Collections.singletonList(batchConfigMap), "REFRESH", null)); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true) .setIngestionConfig(ingestionConfig).build(); - TableConfigUtils.validateIngestionConfig(tableConfig, null); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); // dimension tables should have batch ingestion config ingestionConfig.setBatchIngestionConfig(null); try { - TableConfigUtils.validateIngestionConfig(tableConfig, null); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); Assert.fail("Should fail for Dimension table without batch ingestion config"); } catch (IllegalStateException e) { // expected @@ -807,7 +821,7 @@ public class TableConfigUtilsTest { ingestionConfig.setBatchIngestionConfig( new BatchIngestionConfig(Collections.singletonList(batchConfigMap), "APPEND", null)); try { - TableConfigUtils.validateIngestionConfig(tableConfig, null); + TableConfigUtils.validateIngestionConfig(tableConfig, schema); Assert.fail("Should fail for Dimension table with ingestion type APPEND (should be REFRESH)"); } catch (IllegalStateException e) { // expected diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java index 6df20fe491..c504af4a65 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java @@ -154,26 +154,24 @@ public class PredownloadZKClient implements AutoCloseable { * @param tableInfoMap Map of table name to table info to be filled with table config. */ public void updateSegmentMetadata(List<PredownloadSegmentInfo> predownloadSegmentInfoList, - Map<String, PredownloadTableInfo> tableInfoMap, - InstanceDataManagerConfig instanceDataManagerConfig) { + Map<String, PredownloadTableInfo> tableInfoMap, InstanceDataManagerConfig instanceDataManagerConfig) { // fallback path comes from ZKHelixManager.class getHelixPropertyStore method ZkHelixPropertyStore<ZNRecord> propertyStore = new AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient), PropertyPathBuilder.propertyStore(_clusterName), String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE")); for (PredownloadSegmentInfo predownloadSegmentInfo : predownloadSegmentInfoList) { - tableInfoMap.computeIfAbsent(predownloadSegmentInfo.getTableNameWithType(), name -> { - TableConfig tableConfig = - ZKMetadataProvider.getTableConfig(propertyStore, predownloadSegmentInfo.getTableNameWithType()); + String tableNameWithType = predownloadSegmentInfo.getTableNameWithType(); + tableInfoMap.computeIfAbsent(tableNameWithType, name -> { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType); if (tableConfig == null) { LOGGER.warn("Cannot predownload segment {} because not able to get its table config from ZK", predownloadSegmentInfo.getSegmentName()); return null; } - Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableConfig); + Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType); return new PredownloadTableInfo(name, tableConfig, schema, instanceDataManagerConfig); }); - SegmentZKMetadata segmentZKMetadata = - ZKMetadataProvider.getSegmentZKMetadata(propertyStore, predownloadSegmentInfo.getTableNameWithType(), - predownloadSegmentInfo.getSegmentName()); + SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(propertyStore, tableNameWithType, + predownloadSegmentInfo.getSegmentName()); if (segmentZKMetadata == null) { LOGGER.warn("Cannot predownload segment {} because not able to get its metadata from ZK", predownloadSegmentInfo.getSegmentName()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java index 2bfdc051a6..536128b7c4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java @@ -38,8 +38,6 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { private String _replication; @Deprecated // Use _replication instead private String _replicasPerPartition; - @Deprecated // Schema name should be the same as raw table name - private String _schemaName; private String _timeColumnName; private TimeUnit _timeType; @Deprecated // Use SegmentAssignmentConfig instead @@ -170,19 +168,6 @@ public class SegmentsValidationAndRetentionConfig extends BaseJsonConfig { _replicasPerPartition = replicasPerPartition; } - /** - * @deprecated Schema name should be the same as raw table name - */ - @Deprecated - public String getSchemaName() { - return _schemaName; - } - - @Deprecated - public void setSchemaName(String schemaName) { - _schemaName = schemaName; - } - /** * @deprecated Use {@link InstanceAssignmentConfig} instead. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org