This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ab4f333de8 Enforce schema for all tables (#15333)
ab4f333de8 is described below

commit ab4f333de8ddce6bd1349570472943f96dac7c7e
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Mar 25 16:15:36 2025 -0600

    Enforce schema for all tables (#15333)
---
 .../segmentpruner/SegmentPrunerFactory.java        |   2 +-
 .../pinot/common/config/provider/TableCache.java   |  36 +--
 .../pinot/common/metadata/ZKMetadataProvider.java  |  51 +---
 .../pinot/common/metrics/ControllerGauge.java      |  15 +-
 .../org/apache/pinot/common/utils/SchemaUtils.java |  14 +-
 .../pinot/controller/BaseControllerStarter.java    | 118 +++------
 .../apache/pinot/controller/ControllerConf.java    |  17 +-
 .../api/resources/PinotSchemaRestletResource.java  |  43 ++-
 .../api/resources/PinotTableRestletResource.java   |  48 +---
 .../api/resources/TableAndSchemaConfig.java        |   4 +-
 .../controller/helix/ControllerRequestClient.java  |  11 +
 .../helix/core/PinotHelixResourceManager.java      |  15 +-
 .../pinot/controller/helix/ControllerTest.java     |   5 +
 .../cleanup/SchemaCleanupTaskStatelessTest.java    | 288 ---------------------
 .../core/data/manager/BaseTableDataManager.java    |  10 +-
 .../tests/OfflineClusterIntegrationTest.java       |   5 +-
 ...RefreshSegmentMinionClusterIntegrationTest.java |  11 +-
 .../RefreshSegmentTaskGenerator.java               |   4 +-
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java |   3 -
 .../local/data/manager/TableDataManager.java       |   2 +-
 .../segment/local/utils/TableConfigUtils.java      | 105 +++-----
 .../segment/local/utils/TableConfigUtilsTest.java  |  58 +++--
 .../server/predownload/PredownloadZKClient.java    |  16 +-
 .../SegmentsValidationAndRetentionConfig.java      |  15 --
 24 files changed, 206 insertions(+), 690 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 9efd86764e..6fc8a150c1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -133,7 +133,7 @@ public class SegmentPrunerFactory {
       LOGGER.warn("Cannot enable time range pruning without time column for 
table: {}", tableNameWithType);
       return null;
     }
-    Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableConfig);
+    Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableNameWithType);
     if (schema == null) {
       LOGGER.warn("Cannot enable time range pruning without schema for table: 
{}", tableNameWithType);
       return null;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index a193fc19db..17d953abec 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -81,9 +81,6 @@ public class TableCache implements PinotConfigProvider {
   private final ZkTableConfigChangeListener _zkTableConfigChangeListener = new 
ZkTableConfigChangeListener();
   // Key is table name with type suffix, value is table config info
   private final Map<String, TableConfigInfo> _tableConfigInfoMap = new 
ConcurrentHashMap<>();
-  // Key is table name (with or without type suffix), value is schema name
-  // It only stores table with schema name not matching the raw table name
-  private final Map<String, String> _schemaNameMap = new ConcurrentHashMap<>();
   // Key is lower case table name (with or without type suffix), value is 
actual table name
   // For case-insensitive mode only
   private final Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
@@ -175,8 +172,7 @@ public class TableCache implements PinotConfigProvider {
    */
   @Nullable
   public Map<String, String> getColumnNameMap(String rawTableName) {
-    String schemaName = _schemaNameMap.getOrDefault(rawTableName, 
rawTableName);
-    SchemaInfo schemaInfo = _schemaInfoMap.getOrDefault(schemaName, 
_schemaInfoMap.get(rawTableName));
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
     return schemaInfo != null ? schemaInfo._columnNameMap : null;
   }
 
@@ -225,8 +221,7 @@ public class TableCache implements PinotConfigProvider {
   @Nullable
   @Override
   public Schema getSchema(String rawTableName) {
-    String schemaName = _schemaNameMap.getOrDefault(rawTableName, 
rawTableName);
-    SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName);
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
     return schemaInfo != null ? schemaInfo._schema : null;
   }
 
@@ -262,17 +257,8 @@ public class TableCache implements PinotConfigProvider {
       throws IOException {
     TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
     String tableNameWithType = tableConfig.getTableName();
-    _tableConfigInfoMap.put(tableNameWithType, new 
TableConfigInfo(tableConfig));
-
-    String schemaName = tableConfig.getValidationConfig().getSchemaName();
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    if (schemaName != null && !schemaName.equals(rawTableName)) {
-      _schemaNameMap.put(tableNameWithType, schemaName);
-      _schemaNameMap.put(rawTableName, schemaName);
-    } else {
-      removeSchemaName(tableNameWithType);
-    }
-
+    _tableConfigInfoMap.put(tableNameWithType, new 
TableConfigInfo(tableConfig));
     if (_ignoreCase) {
       _tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
       _tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
@@ -287,7 +273,6 @@ public class TableCache implements PinotConfigProvider {
     String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
     _tableConfigInfoMap.remove(tableNameWithType);
-    removeSchemaName(tableNameWithType);
     if (_ignoreCase) {
       _tableNameMap.remove(tableNameWithType.toLowerCase());
       String lowerCaseRawTableName = rawTableName.toLowerCase();
@@ -314,21 +299,6 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
-  private void removeSchemaName(String tableNameWithType) {
-    if (_schemaNameMap.remove(tableNameWithType) != null) {
-      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-        if 
(!_schemaNameMap.containsKey(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)))
 {
-          _schemaNameMap.remove(rawTableName);
-        }
-      } else {
-        if 
(!_schemaNameMap.containsKey(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)))
 {
-          _schemaNameMap.remove(rawTableName);
-        }
-      }
-    }
-  }
-
   private void addSchemas(List<String> paths) {
     // Subscribe data changes before reading the data to avoid missing changes
     for (String path : paths) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 37ba1499e3..b95e76f19c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -44,7 +44,6 @@ import org.apache.pinot.spi.config.ConfigUtils;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -630,60 +629,16 @@ public class ZKMetadataProvider {
    */
   @Nullable
   public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableName) {
-    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    Schema schema = getSchema(propertyStore, rawTableName);
-    if (schema != null) {
-      return schema;
-    }
-
-    // For backward compatible where schema name is not the same as raw table 
name
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    // Try to fetch realtime schema first
-    if (tableType == null || tableType == TableType.REALTIME) {
-      TableConfig realtimeTableConfig = getRealtimeTableConfig(propertyStore, 
tableName);
-      if (realtimeTableConfig != null) {
-        String realtimeSchemaNameFromValidationConfig = 
realtimeTableConfig.getValidationConfig().getSchemaName();
-        if (realtimeSchemaNameFromValidationConfig != null) {
-          schema = getSchema(propertyStore, 
realtimeSchemaNameFromValidationConfig);
-        }
-      }
-    }
-    // Try to fetch offline schema if realtime schema does not exist
-    if (schema == null && (tableType == null || tableType == 
TableType.OFFLINE)) {
-      TableConfig offlineTableConfig = getOfflineTableConfig(propertyStore, 
tableName);
-      if (offlineTableConfig != null) {
-        String offlineSchemaNameFromValidationConfig = 
offlineTableConfig.getValidationConfig().getSchemaName();
-        if (offlineSchemaNameFromValidationConfig != null) {
-          schema = getSchema(propertyStore, 
offlineSchemaNameFromValidationConfig);
-        }
-      }
-    }
-    if (schema != null && LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Schema name does not match raw table name, schema name: 
{}, raw table name: {}",
-          schema.getSchemaName(), 
TableNameBuilder.extractRawTableName(tableName));
-    }
-    return schema;
+    return getSchema(propertyStore, 
TableNameBuilder.extractRawTableName(tableName));
   }
 
   /**
    * Get the schema associated with the given table.
    */
+  @Deprecated
   @Nullable
   public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord> 
propertyStore, TableConfig tableConfig) {
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
-    Schema schema = getSchema(propertyStore, rawTableName);
-    if (schema != null) {
-      return schema;
-    }
-    String schemaNameFromTableConfig = 
tableConfig.getValidationConfig().getSchemaName();
-    if (schemaNameFromTableConfig != null) {
-      schema = getSchema(propertyStore, schemaNameFromTableConfig);
-    }
-    if (schema != null && LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Schema name does not match raw table name, schema name: 
{}, raw table name: {}",
-          schemaNameFromTableConfig, rawTableName);
-    }
-    return schema;
+    return getTableSchema(propertyStore, tableConfig.getTableName());
   }
 
   /**
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index fa27e60f4b..9ea6008b3f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -151,21 +151,12 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // Consumption availability lag in ms at a partition level
   MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false),
 
-  // Number of table schema got misconfigured
-  MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true),
+  // Number of table without table config
+  TABLE_WITHOUT_TABLE_CONFIG_COUNT("tableWithoutTableConfigCount", true),
 
-  // Number of table without schema
+  // Number of table with table config but without schema
   TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true),
 
-  // Number of table schema got fixed
-  FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true),
-
-  // Number of tables that we want to fix but failed to copy schema from old 
schema name to new schema name
-  FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true),
-
-  // Number of tables that we want to fix but failed to update table config
-  FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true),
-
   
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE("LLCSegmentDeepStoreUploadRetryQueueSize",
 false),
 
   TABLE_CONSUMPTION_PAUSED("tableConsumptionPaused", false),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
index 4cadf09bea..2800441fa6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.Charset;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.io.IOUtils;
 import org.apache.hc.client5.http.classic.methods.HttpDelete;
@@ -58,7 +57,7 @@ public class SchemaUtils {
   /**
    * Fetch {@link Schema} from a {@link ZNRecord}.
    */
-  public static Schema fromZNRecord(@Nonnull ZNRecord record)
+  public static Schema fromZNRecord(ZNRecord record)
       throws IOException {
     String schemaJSON = record.getSimpleField("schemaJSON");
     return Schema.fromString(schemaJSON);
@@ -67,7 +66,7 @@ public class SchemaUtils {
   /**
    * Wrap {@link Schema} into a {@link ZNRecord}.
    */
-  public static ZNRecord toZNRecord(@Nonnull Schema schema) {
+  public static ZNRecord toZNRecord(Schema schema) {
     ZNRecord record = new ZNRecord(schema.getSchemaName());
     record.setSimpleField("schemaJSON", schema.toSingleLineJsonString());
     return record;
@@ -79,7 +78,8 @@ public class SchemaUtils {
    * @return schema on success.
    * <P><code>null</code> on failure.
    */
-  public static @Nullable Schema getSchema(@Nonnull String host, int port, 
@Nonnull String schemaName) {
+  @Nullable
+  public static Schema getSchema(String host, int port, String schemaName) {
     Preconditions.checkNotNull(host);
     Preconditions.checkNotNull(schemaName);
 
@@ -112,7 +112,7 @@ public class SchemaUtils {
    * @return <code>true</code> on success.
    * <P><code>false</code> on failure.
    */
-  public static boolean postSchema(@Nonnull String host, int port, @Nonnull 
Schema schema) {
+  public static boolean postSchema(String host, int port, Schema schema) {
     Preconditions.checkNotNull(host);
     Preconditions.checkNotNull(schema);
 
@@ -144,7 +144,7 @@ public class SchemaUtils {
    * @return <code>true</code> on success.
    * <P><code>false</code> on failure.
    */
-  public static boolean deleteSchema(@Nonnull String host, int port, @Nonnull 
String schemaName) {
+  public static boolean deleteSchema(String host, int port, String schemaName) 
{
     Preconditions.checkNotNull(host);
     Preconditions.checkNotNull(schemaName);
 
@@ -172,7 +172,7 @@ public class SchemaUtils {
    * @return <code>true</code> if two schemas equal to each other.
    * <p><code>false</code>if two schemas do not equal to each other.
    */
-  public static boolean equalsIgnoreVersion(@Nonnull Schema schema1, @Nonnull 
Schema schema2) {
+  public static boolean equalsIgnoreVersion(Schema schema1, Schema schema2) {
     Preconditions.checkNotNull(schema1);
     Preconditions.checkNotNull(schema2);
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 94c380cbfe..2e4954076c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -37,15 +37,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.hc.core5.http.io.SocketConfig;
 import org.apache.hc.core5.util.Timeout;
-import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -73,7 +70,6 @@ import 
org.apache.pinot.common.minion.InMemoryTaskManagerStatusCache;
 import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
 import org.apache.pinot.common.minion.TaskManagerStatusCache;
 import org.apache.pinot.common.utils.PinotAppConfigs;
-import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
@@ -141,7 +137,6 @@ import org.apache.pinot.spi.services.ServiceStartable;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.NetUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.slf4j.Logger;
@@ -593,9 +588,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     LOGGER.info("Starting controller admin application on: {}", 
ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
 
-    // One time job to fix schema name in all tables
-    // This method can be removed after the next major release.
-    fixSchemaNameInTableConfig();
+    enforceTableConfigAndSchema();
 
     _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new 
File(_config.getDataDir()).exists() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
@@ -626,91 +619,46 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   }
 
   /**
-   * This method is used to fix table/schema names.
-   * TODO: in the next release, maybe 2.0.0, we can remove this method. 
Meanwhile we can delete the orphan schemas
-   * that has been existed longer than a certain time period.
-   *
+   * Scan all table resources in the cluster and ensure table config and 
schema exist for each table.
+   * TODO: Cleanup orphan table config and schema
    */
-  @VisibleForTesting
-  public void fixSchemaNameInTableConfig() {
-    AtomicInteger misconfiguredTableCount = new AtomicInteger();
-    AtomicInteger tableWithoutSchemaCount = new AtomicInteger();
-    AtomicInteger fixedSchemaTableCount = new AtomicInteger();
-    AtomicInteger failedToCopySchemaCount = new AtomicInteger();
-    AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger();
+  private void enforceTableConfigAndSchema() {
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixResourceManager.getPropertyStore();
-
-    _helixResourceManager.getAllTables().forEach(tableNameWithType -> {
-      Pair<TableConfig, Integer> tableConfigWithVersion =
-          ZKMetadataProvider.getTableConfigWithVersion(propertyStore, 
tableNameWithType);
-      if (tableConfigWithVersion == null) {
-        // This might due to table deletion, just log it here.
-        LOGGER.warn("Failed to find table config for table: {}, the table 
likely already got deleted",
-            tableNameWithType);
-        return;
+    List<String> tablesWithoutTableConfig = new ArrayList<>();
+    List<String> tablesWithoutSchema = new ArrayList<>();
+    for (String tableNameWithType : _helixResourceManager.getAllTables()) {
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType);
+      if (tableConfig == null) {
+        tablesWithoutTableConfig.add(tableNameWithType);
+        continue;
       }
-      TableConfig tableConfig = tableConfigWithVersion.getLeft();
-      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-      String schemaPath = 
ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
-      boolean schemaExists = propertyStore.exists(schemaPath, 
AccessOption.PERSISTENT);
-      String existSchemaName = 
tableConfig.getValidationConfig().getSchemaName();
-      if (existSchemaName == null || existSchemaName.equals(rawTableName)) {
-        // Although the table config is valid, we still need to ensure the 
schema exists
-        if (!schemaExists) {
-          LOGGER.warn("Failed to find schema for table: {}", 
tableNameWithType);
-          tableWithoutSchemaCount.getAndIncrement();
-          return;
-        }
-        // Table config is already in good status
-        return;
+      Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableNameWithType);
+      if (schema == null) {
+        tablesWithoutSchema.add(tableNameWithType);
       }
-      misconfiguredTableCount.getAndIncrement();
-      if (schemaExists) {
-        // If a schema named `rawTableName` already exists, then likely this 
is a misconfiguration.
-        // Reset schema name in table config to null to let the table point to 
the existing schema.
-        LOGGER.warn("Schema: {} already exists, fix the schema name in table 
config from {} to null", rawTableName,
-            existSchemaName);
+    }
+    if (!tablesWithoutTableConfig.isEmpty()) {
+      LOGGER.error("[CRITICAL!!!] Failed to find table config for tables: {}", 
tablesWithoutTableConfig);
+      if (_config.isExitOnTableConfigCheckFailure()) {
+        throw new IllegalStateException("Failed to find table config for 
tables: " + tablesWithoutTableConfig
+            + ", exiting! Please set 
controller.startup.exitOnTableConfigCheckFailure=false to not exit and fix 
these "
+            + "tables.");
       } else {
-        // Copy the schema current table referring to to `rawTableName` if it 
does not exist
-        Schema schema = _helixResourceManager.getSchema(existSchemaName);
-        if (schema == null) {
-          LOGGER.warn("Failed to find schema: {} for table: {}", 
existSchemaName, tableNameWithType);
-          tableWithoutSchemaCount.getAndIncrement();
-          return;
-        }
-        schema.setSchemaName(rawTableName);
-        if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), 
AccessOption.PERSISTENT)) {
-          LOGGER.info("Copied schema: {} to {}", existSchemaName, 
rawTableName);
-        } else {
-          LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, 
rawTableName);
-          failedToCopySchemaCount.getAndIncrement();
-          return;
-        }
+        
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_TABLE_CONFIG_COUNT,
+            tablesWithoutTableConfig.size());
       }
-      // Update table config to remove schema name
-      tableConfig.getValidationConfig().setSchemaName(null);
-      if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig, 
tableConfigWithVersion.getRight())) {
-        LOGGER.info("Removed schema name from table config for table: {}", 
tableNameWithType);
-        fixedSchemaTableCount.getAndIncrement();
+    }
+    if (!tablesWithoutSchema.isEmpty()) {
+      LOGGER.error("[CRITICAL!!!] Failed to find schema for tables: {}", 
tablesWithoutSchema);
+      if (_config.isExitOnSchemaCheckFailure()) {
+        throw new IllegalStateException("Failed to find schema for tables: " + 
tablesWithoutSchema
+            + ", exiting! Please set 
controller.startup.exitOnSchemaCheckFailure=false to not exit and fix these "
+            + "tables.");
       } else {
-        LOGGER.warn("Failed to update table config for table: {}", 
tableNameWithType);
-        failedToUpdateTableConfigCount.getAndIncrement();
+        
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT,
+            tablesWithoutSchema.size());
       }
-    });
-    LOGGER.info(
-        "Found {} tables misconfigured, {} tables without schema. Successfully 
fixed schema for {} tables, failed to "
-            + "fix {} tables due to copy schema failure, failed to fix {} 
tables due to update table config failure.",
-        misconfiguredTableCount.get(), tableWithoutSchemaCount.get(), 
fixedSchemaTableCount.get(),
-        failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get());
-
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT,
-        misconfiguredTableCount.get());
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT,
 tableWithoutSchemaCount.get());
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT,
 fixedSchemaTableCount.get());
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT,
-        failedToCopySchemaCount.get());
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT,
-        failedToUpdateTableConfigCount.get());
+    }
   }
 
   private ServiceStatus.ServiceStatusCallback 
generateServiceStatusCallback(HelixManager helixManager) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index c76943773b..29ebb084f0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -362,6 +362,11 @@ public class ControllerConf extends PinotConfiguration {
   public static final String ENFORCE_POOL_BASED_ASSIGNMENT_KEY = 
"enforce.pool.based.assignment";
   public static final boolean DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT = false;
 
+  public static final String EXIT_ON_TABLE_CONFIG_CHECK_FAILURE = 
"controller.startup.exitOnTableConfigCheckFailure";
+  public static final boolean DEFAULT_EXIT_ON_TABLE_CONFIG_CHECK_FAILURE = 
true;
+  public static final String EXIT_ON_SCHEMA_CHECK_FAILURE = 
"controller.startup.exitOnSchemaCheckFailure";
+  public static final boolean DEFAULT_EXIT_ON_SCHEMA_CHECK_FAILURE = true;
+
   public ControllerConf() {
     super(new HashMap<>());
   }
@@ -1205,12 +1210,20 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(ENFORCE_POOL_BASED_ASSIGNMENT_KEY, 
DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT);
   }
 
+  public boolean isExitOnTableConfigCheckFailure() {
+    return getProperty(EXIT_ON_TABLE_CONFIG_CHECK_FAILURE, 
DEFAULT_EXIT_ON_TABLE_CONFIG_CHECK_FAILURE);
+  }
+
+  public boolean isExitOnSchemaCheckFailure() {
+    return getProperty(EXIT_ON_SCHEMA_CHECK_FAILURE, 
DEFAULT_EXIT_ON_SCHEMA_CHECK_FAILURE);
+  }
+
   public void setEnableSwagger(boolean value) {
-    setProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE, value);
+    setProperty(CONSOLE_SWAGGER_ENABLE, value);
   }
 
   public boolean isEnableSwagger() {
-    String enableSwagger = getProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE);
+    String enableSwagger = getProperty(CONSOLE_SWAGGER_ENABLE);
     return enableSwagger == null || Boolean.parseBoolean(enableSwagger);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
index d0a087c1d8..2e1d0d2c88 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
@@ -206,15 +206,18 @@ public class PinotSchemaRestletResource {
   })
   public ConfigSuccessResponse updateSchema(
       @ApiParam(value = "Name of the schema", required = true) 
@PathParam("schemaName") String schemaName,
-      @ApiParam(value = "Whether to reload the table if the new schema is 
backward compatible") @DefaultValue("false")
-      @QueryParam("reload") boolean reload, @Context HttpHeaders headers, 
FormDataMultiPart multiPart) {
+      @ApiParam(value = "Whether to reload the table after updating the 
schema") @DefaultValue("false")
+      @QueryParam("reload") boolean reload,
+      @ApiParam(value = "Whether to force update the schema even if the new 
schema is backward incompatible")
+      @DefaultValue("false") @QueryParam("force") boolean force, @Context 
HttpHeaders headers,
+      FormDataMultiPart multiPart) {
     schemaName = DatabaseUtils.translateTableName(schemaName, headers);
     Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
         getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart);
     Schema schema = schemaAndUnrecognizedProps.getLeft();
     validateSchemaName(schema);
     
schema.setSchemaName(DatabaseUtils.translateTableName(schema.getSchemaName(), 
headers));
-    SuccessResponse successResponse = updateSchema(schemaName, schema, reload);
+    SuccessResponse successResponse = updateSchema(schemaName, schema, reload, 
force);
     return new ConfigSuccessResponse(successResponse.getStatus(), 
schemaAndUnrecognizedProps.getRight());
   }
 
@@ -233,15 +236,18 @@ public class PinotSchemaRestletResource {
   })
   public ConfigSuccessResponse updateSchema(
       @ApiParam(value = "Name of the schema", required = true) 
@PathParam("schemaName") String schemaName,
-      @ApiParam(value = "Whether to reload the table if the new schema is 
backward compatible") @DefaultValue("false")
-      @QueryParam("reload") boolean reload, @Context HttpHeaders headers, 
String schemaJsonString) {
+      @ApiParam(value = "Whether to reload the table after updating the 
schema") @DefaultValue("false")
+      @QueryParam("reload") boolean reload,
+      @ApiParam(value = "Whether to force update the schema even if the new 
schema is backward incompatible")
+      @DefaultValue("false") @QueryParam("force") boolean force, @Context 
HttpHeaders headers,
+      String schemaJsonString) {
     schemaName = DatabaseUtils.translateTableName(schemaName, headers);
     Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
         getSchemaAndUnrecognizedPropertiesFromJson(schemaJsonString);
     Schema schema = schemaAndUnrecognizedProps.getLeft();
     validateSchemaName(schema);
     
schema.setSchemaName(DatabaseUtils.translateTableName(schema.getSchemaName(), 
headers));
-    SuccessResponse successResponse = updateSchema(schemaName, schema, reload);
+    SuccessResponse successResponse = updateSchema(schemaName, schema, reload, 
force);
     return new ConfigSuccessResponse(successResponse.getStatus(), 
schemaAndUnrecognizedProps.getRight());
   }
 
@@ -446,7 +452,7 @@ public class PinotSchemaRestletResource {
    * @param reload  set to true to reload the tables using the schema, so 
committed segments can pick up the new schema
    * @return SuccessResponse
    */
-  private SuccessResponse updateSchema(String schemaName, Schema schema, 
boolean reload) {
+  private SuccessResponse updateSchema(String schemaName, Schema schema, 
boolean reload, boolean force) {
     validateSchemaInternal(schema);
 
     if (!schemaName.equals(schema.getSchemaName())) {
@@ -457,7 +463,7 @@ public class PinotSchemaRestletResource {
     }
 
     try {
-      _pinotHelixResourceManager.updateSchema(schema, reload, false);
+      _pinotHelixResourceManager.updateSchema(schema, reload, force);
       // Best effort notification. If controller fails at this point, no 
notification is given.
       LOGGER.info("Notifying metadata event for updating schema: {}", 
schemaName);
       _metadataEventNotifierFactory.create().notifyOnSchemaEvents(schema, 
SchemaEventType.UPDATE);
@@ -520,7 +526,6 @@ public class PinotSchemaRestletResource {
    */
   private Pair<Schema, Map<String, Object>> 
getSchemaAndUnrecognizedPropertiesFromJson(String schemaJsonString)
       throws ControllerApplicationException {
-    Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps;
     try {
       return 
JsonUtils.stringToObjectAndUnrecognizedProperties(schemaJsonString, 
Schema.class);
     } catch (Exception e) {
@@ -537,24 +542,14 @@ public class PinotSchemaRestletResource {
     }
 
     // If the schema is associated with a table, we should not delete it.
-    // TODO: Check OFFLINE tables as well. There are 2 side effects:
-    //       - Increases ZK read when there are lots of OFFLINE tables
-    //       - Behavior change since we don't allow deleting schema for 
OFFLINE tables
-    List<String> realtimeTables = 
_pinotHelixResourceManager.getAllRealtimeTables();
-    for (String realtimeTableName : realtimeTables) {
-      if 
(schemaName.equals(TableNameBuilder.extractRawTableName(realtimeTableName))) {
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(schemaName);
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(schemaName);
+    for (String tableNameWithType : new String[]{offlineTableName, 
realtimeTableName}) {
+      if (_pinotHelixResourceManager.hasTable(tableNameWithType)) {
         throw new ControllerApplicationException(LOGGER,
-            String.format("Cannot delete schema %s, as it is associated with 
table %s", schemaName, realtimeTableName),
+            String.format("Cannot delete schema %s, as it is associated with 
table %s", schemaName, tableNameWithType),
             Response.Status.CONFLICT);
       }
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(realtimeTableName);
-      if (tableConfig != null) {
-        if 
(schemaName.equals(tableConfig.getValidationConfig().getSchemaName())) {
-          throw new ControllerApplicationException(LOGGER,
-              String.format("Cannot delete schema %s, as it is associated with 
table %s", schemaName,
-                  realtimeTableName), Response.Status.CONFLICT);
-        }
-      }
     }
 
     LOGGER.info("Trying to delete schema {}", schemaName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index fed660e988..eb604b55b5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -112,7 +112,6 @@ import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.ManualAuthorization;
 import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
 import org.apache.pinot.spi.config.table.TableStatus;
@@ -194,9 +193,8 @@ public class PinotTableRestletResource {
   HttpClientConnectionManager _connectionManager;
 
   /**
-   * API to create a table. Before adding, validations will be done (min 
number of replicas,
-   * checking offline and realtime table configs match, checking for tenants 
existing)
-   * @param tableConfigStr
+   * API to create a table. Before adding, validations will be done (min 
number of replicas, checking offline and
+   * realtime table configs match, checking for tenants existing).
    */
   @POST
   @Produces(MediaType.APPLICATION_JSON)
@@ -218,14 +216,13 @@ public class PinotTableRestletResource {
       tableConfig = tableConfigAndUnrecognizedProperties.getLeft();
       tableNameWithType = 
DatabaseUtils.translateTableName(tableConfig.getTableName(), httpHeaders);
       tableConfig.setTableName(tableNameWithType);
-      // Handle legacy config
-      handleLegacySchemaConfig(tableConfig, httpHeaders);
 
       // validate permission
       ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, 
httpHeaders,
           AccessType.CREATE, Actions.Table.CREATE_TABLE, 
_accessControlFactory, LOGGER);
 
-      schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
+      Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", tableNameWithType);
 
       TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager, 
tableConfig, schema, Collections.emptyMap());
 
@@ -490,8 +487,6 @@ public class PinotTableRestletResource {
       tableConfig = tableConfigAndUnrecognizedProperties.getLeft();
       tableNameWithType = 
DatabaseUtils.translateTableName(tableConfig.getTableName(), headers);
       tableConfig.setTableName(tableNameWithType);
-      // Handle legacy config
-      handleLegacySchemaConfig(tableConfig, headers);
       String tableNameFromPath = DatabaseUtils.translateTableName(
           
TableNameBuilder.forType(tableConfig.getTableType()).tableNameWithType(tableName),
 headers);
       if (!tableNameFromPath.equals(tableNameWithType)) {
@@ -500,7 +495,8 @@ public class PinotTableRestletResource {
             Response.Status.BAD_REQUEST);
       }
 
-      schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
+      Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", tableNameWithType);
       TableConfigUtils.validate(tableConfig, schema, typesToSkip);
     } catch (Exception e) {
       String msg = String.format("Invalid table config: %s with error: %s", 
tableName, e.getMessage());
@@ -559,24 +555,22 @@ public class PinotTableRestletResource {
     String tableNameWithType = 
DatabaseUtils.translateTableName(tableConfig.getTableName(), httpHeaders);
     tableConfig.setTableName(tableNameWithType);
 
-    // Handle legacy config
-    handleLegacySchemaConfig(tableConfig, httpHeaders);
-
     // validate permission
     ResourceUtils.checkPermissionAndAccess(tableNameWithType, request, 
httpHeaders,
         AccessType.READ, Actions.Table.VALIDATE_TABLE_CONFIGS, 
_accessControlFactory, LOGGER);
 
-    ObjectNode validationResponse =
-        validateConfig(tableConfig, 
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig), typesToSkip);
+    ObjectNode validationResponse = validateConfig(tableConfig, typesToSkip);
     validationResponse.set("unrecognizedProperties",
         
JsonUtils.objectToJsonNode(tableConfigAndUnrecognizedProperties.getRight()));
     return validationResponse;
   }
 
-  private ObjectNode validateConfig(TableConfig tableConfig, Schema schema, 
@Nullable String typesToSkip) {
+  private ObjectNode validateConfig(TableConfig tableConfig, @Nullable String 
typesToSkip) {
+    String tableNameWithType = tableConfig.getTableName();
     try {
+      Schema schema = 
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
       if (schema == null) {
-        throw new SchemaNotFoundException("Got empty schema");
+        throw new SchemaNotFoundException("Failed to find schema for table: " 
+ tableNameWithType);
       }
       TableConfigUtils.validate(tableConfig, schema, typesToSkip);
       TaskConfigUtils.validateTaskConfigs(tableConfig, schema, 
_pinotTaskManager, typesToSkip);
@@ -588,7 +582,7 @@ public class PinotTableRestletResource {
       }
       return tableConfigValidateStr;
     } catch (Exception e) {
-      String msg = String.format("Invalid table config: %s. %s", 
tableConfig.getTableName(), e.getMessage());
+      String msg = String.format("Invalid table config: %s. %s", 
tableNameWithType, e.getMessage());
       throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
     }
   }
@@ -1279,24 +1273,6 @@ public class PinotTableRestletResource {
     return timeBoundaryMs;
   }
 
-  /**
-   * Handles the legacy schema configuration for a given table configuration.
-   * This method updates the schema name in the validation configuration of 
the table config
-   * to ensure it is correctly translated based on the provided HTTP headers.
-   * This is necessary to maintain compatibility with older configurations 
that may not
-   * have the schema name properly set or formatted.
-   *
-   * @param tableConfig The {@link TableConfig} object containing the table 
configuration.
-   * @param httpHeaders The {@link HttpHeaders} object containing the HTTP 
headers, used to
-   *                    translate the schema name if necessary.
-   */
-  private void handleLegacySchemaConfig(TableConfig tableConfig, HttpHeaders 
httpHeaders) {
-    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
-    if (validationConfig.getSchemaName() != null) {
-      
validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(),
 httpHeaders));
-    }
-  }
-
   /**
    * Try to calculate the instance partitions for the given table config. 
Throws exception if it fails.
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
index a4a76d1527..a2f262b6fb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.api.resources;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.TableConfigs;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -39,7 +38,7 @@ public class TableAndSchemaConfig {
 
   @JsonCreator
   public TableAndSchemaConfig(@JsonProperty(value = "tableConfig", required = 
true) TableConfig tableConfig,
-      @JsonProperty("schema") @Nullable Schema schema) {
+      @JsonProperty(value = "schema", required = true) Schema schema) {
     _tableConfig = tableConfig;
     _schema = schema;
   }
@@ -48,7 +47,6 @@ public class TableAndSchemaConfig {
     return _tableConfig;
   }
 
-  @Nullable
   public Schema getSchema() {
     return _schema;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 754367fef5..4a8c54993e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -105,6 +105,17 @@ public class ControllerRequestClient {
     }
   }
 
+  public void forceUpdateSchema(Schema schema)
+      throws IOException {
+    String url = 
_controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName()) + 
"?force=true";
+    try {
+      HttpClient.wrapAndThrowHttpException(
+          _httpClient.sendMultipartPutRequest(url, 
schema.toSingleLineJsonString(), _headers));
+    } catch (HttpErrorStatusException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void deleteSchema(String schemaName)
       throws IOException {
     String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f125854976..c8478c25c5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1630,21 +1630,10 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getTableSchema(_propertyStore, tableName);
   }
 
-  /**
-   * Find schema with same name as rawTableName. If not found, find schema 
using schemaName in validationConfig.
-   * For OFFLINE table, it is possible that schema was not uploaded before 
creating the table. Hence for OFFLINE,
-   * this method can return null.
-   */
+  @Deprecated
   @Nullable
   public Schema getSchemaForTableConfig(TableConfig tableConfig) {
-    Schema schema = 
getSchema(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
-    if (schema == null) {
-      String schemaName = tableConfig.getValidationConfig().getSchemaName();
-      if (schemaName != null) {
-        schema = getSchema(schemaName);
-      }
-    }
-    return schema;
+    return ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
   }
 
   public List<String> getSchemaNames() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 7a00a4d8e4..293f649fa9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -664,6 +664,11 @@ public class ControllerTest {
     getControllerRequestClient().updateSchema(schema);
   }
 
+  public void forceUpdateSchema(Schema schema)
+      throws IOException {
+    getControllerRequestClient().forceUpdateSchema(schema);
+  }
+
   public Schema getSchema(String schemaName) {
     Schema schema = _helixResourceManager.getSchema(schemaName);
     assertNotNull(schema);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
deleted file mode 100644
index 19c4756634..0000000000
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.cleanup;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.MetricValueUtils;
-import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.controller.BaseControllerStarter;
-import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * This test can be deleted once {@link 
BaseControllerStarter#fixSchemaNameInTableConfig()} is deleted. Likely in 2.0.0.
- */
-@Test(groups = "stateless")
-public class SchemaCleanupTaskStatelessTest extends ControllerTest {
-  @BeforeClass
-  public void setup()
-      throws Exception {
-    startZk();
-    startController();
-    startFakeBroker();
-    startFakeServer();
-  }
-
-  private void startFakeBroker()
-      throws Exception {
-    String brokerInstance = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + 
NetUtils.getHostAddress() + "_"
-        + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
-
-    // Create server instance with the fake server state model
-    HelixManager brokerHelixManager =
-        HelixManagerFactory.getZKHelixManager(getHelixClusterName(), 
brokerInstance, InstanceType.PARTICIPANT,
-            getZkUrl());
-    brokerHelixManager.connect();
-
-    // Add Helix tag to the server
-    
brokerHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
 brokerInstance,
-        TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
-  }
-
-  private void startFakeServer()
-      throws Exception {
-    String serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + 
NetUtils.getHostAddress() + "_"
-        + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
-
-    // Create server instance with the fake server state model
-    HelixManager serverHelixManager = HelixManagerFactory
-        .getZKHelixManager(getHelixClusterName(), serverInstance, 
InstanceType.PARTICIPANT, getZkUrl());
-    serverHelixManager.connect();
-
-    // Add Helix tag to the server
-    
serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
 serverInstance,
-        
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
-  }
-
-  @AfterClass
-  public void teardown() {
-    stopController();
-    stopZk();
-  }
-
-  @Test
-  public void testSchemaCleanupTask()
-      throws Exception {
-    PinotMetricUtils.cleanUp();
-    PinotMetricUtils.getPinotMetricsRegistry();
-    // 1. Add a schema
-    addSchema(createDummySchema("t1"));
-    addSchema(createDummySchema("t2"));
-    addSchema(createDummySchema("t3"));
-
-    // 2. Add a table with the schema name reference
-    addTableConfig(createDummyTableConfig("t1", "t1"));
-    addTableConfig(createDummyTableConfig("t2", "t2"));
-    addTableConfig(createDummyTableConfig("t3", "t3"));
-
-    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", 
"t2"));
-    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", 
"t3"));
-    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", 
"t1"));
-
-    // 3. Fix table schema
-    _controllerStarter.fixSchemaNameInTableConfig();
-
-    // 4. validate
-    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
-    
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
-    
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
-    
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
-
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
-
-    // 5. Clean up
-    for (String table : getHelixResourceManager().getAllOfflineTables()) {
-      getHelixResourceManager().deleteOfflineTable(table);
-    }
-    for (String schema : getHelixResourceManager().getSchemaNames()) {
-      getHelixResourceManager().deleteSchema(schema);
-    }
-  }
-
-  @Test
-  public void testSchemaCleanupTaskNormalCase()
-      throws Exception {
-    PinotMetricUtils.cleanUp();
-    PinotMetricUtils.getPinotMetricsRegistry();
-    // 1. Add a schema
-    addSchema(createDummySchema("t1"));
-    addSchema(createDummySchema("t2"));
-    addSchema(createDummySchema("t3"));
-
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
-    // 2. Add a table with the schema name reference
-    addTableConfig(createDummyTableConfig("t1", "t1"));
-    addTableConfig(createDummyTableConfig("t2", "t2"));
-    addTableConfig(createDummyTableConfig("t3", "t3"));
-
-    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-
-    // 3. Create new schemas and update table to new schema
-    addSchema(createDummySchema("t11"));
-    addSchema(createDummySchema("t21"));
-    addSchema(createDummySchema("t31"));
-    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1", 
"t11"));
-    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2", 
"t21"));
-    _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3", 
"t31"));
-
-    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
-    
assertEquals(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName(),
 "t11");
-    
assertEquals(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName(),
 "t21");
-    
assertEquals(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName(),
 "t31");
-
-    // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed 
later.
-    deleteSchema("t1");
-    deleteSchema("t2");
-    deleteSchema("t3");
-
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
-    // 5. Fix table schema
-    _controllerStarter.fixSchemaNameInTableConfig();
-
-    // 6. All tables will directly set schema.
-    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
-    assertTrue(getHelixResourceManager().getSchemaNames().contains("t1"));
-    assertTrue(getHelixResourceManager().getSchemaNames().contains("t2"));
-    assertTrue(getHelixResourceManager().getSchemaNames().contains("t3"));
-
-    
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
-    
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
-    
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
-
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
-
-    // 7. Clean up
-    for (String table : getHelixResourceManager().getAllOfflineTables()) {
-      getHelixResourceManager().deleteOfflineTable(table);
-    }
-    for (String schema : getHelixResourceManager().getSchemaNames()) {
-      getHelixResourceManager().deleteSchema(schema);
-    }
-  }
-
-  @Test
-  public void testMissingSchema()
-      throws Exception {
-    PinotMetricUtils.cleanUp();
-    PinotMetricUtils.getPinotMetricsRegistry();
-    // 1. Add a schema
-    addSchema(createDummySchema("t1"));
-    addSchema(createDummySchema("t2"));
-    addSchema(createDummySchema("t3"));
-
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
-    // 2. Add a table with the schema name reference
-    addTableConfig(createDummyTableConfig("t1"));
-    addTableConfig(createDummyTableConfig("t2"));
-    addTableConfig(createDummyTableConfig("t3"));
-
-    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-
-    // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed 
later.
-    deleteSchema("t1");
-    deleteSchema("t2");
-    deleteSchema("t3");
-
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
-
-    // 5. Fix table schema
-    _controllerStarter.fixSchemaNameInTableConfig();
-
-    // 6. We cannot fix schema
-    assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-    assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
-
-    
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
-    
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
-    
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
-
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
-    
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
-        ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
-
-    // 7. Clean up
-    for (String table : getHelixResourceManager().getAllOfflineTables()) {
-      getHelixResourceManager().deleteOfflineTable(table);
-    }
-    for (String schema : getHelixResourceManager().getSchemaNames()) {
-      getHelixResourceManager().deleteSchema(schema);
-    }
-  }
-
-  private TableConfig createDummyTableConfig(String table) {
-    return new TableConfigBuilder(TableType.OFFLINE)
-        .setTableName(table)
-        .build();
-  }
-
-  private TableConfig createDummyTableConfig(String table, String schema) {
-    TableConfig tableConfig = createDummyTableConfig(table);
-    tableConfig.getValidationConfig().setSchemaName(schema);
-    return tableConfig;
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index fa985f2b06..13edaf56cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -96,7 +96,6 @@ import 
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -358,16 +357,13 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   public Pair<TableConfig, Schema> fetchTableConfigAndSchema() {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableConfig);
-    // NOTE: Schema is mandatory for REALTIME table.
-    if (tableConfig.getTableType() == TableType.REALTIME) {
-      Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
-    }
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+    Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", _tableNameWithType);
     return Pair.of(tableConfig, schema);
   }
 
   @Override
-  public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, 
@Nullable Schema schema) {
+  public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, 
Schema schema) {
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
     return indexLoadingConfig;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 70ecd19d5c..32de491a30 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2078,14 +2078,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     
tableConfig.getIndexingConfig().getNoDictionaryColumns().remove("NewAddedRawDerivedMVIntDimension");
     updateTableConfig(tableConfig);
 
-    // Need to first delete then add the schema because removing columns is 
backward-incompatible change
-    deleteSchema(getTableName());
+    // Need to force update the schema because removing columns is 
backward-incompatible change
     Schema schema = createSchema();
     schema.removeField("AirlineID");
     schema.removeField("ArrTime");
     schema.removeField("AirTime");
     schema.removeField("ArrDel15");
-    addSchema(schema);
+    forceUpdateSchema(schema);
 
     // Trigger reload
     reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index 9a0b23bae4..8f76ce7f6e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -55,7 +55,10 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 public class RefreshSegmentMinionClusterIntegrationTest extends 
BaseClusterIntegrationTest {
@@ -151,13 +154,12 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
 
     // Change datatype from INT -> LONG for airlineId
-    deleteSchema(getTableName());
     Schema schema = createSchema();
     schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.LONG);
     schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.STRING);
     
schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.FLOAT);
     
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING);
-    addSchema(schema);
+    forceUpdateSchema(schema);
 
     assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
             .setTablesToSchedule(Collections.singleton(offlineTableName)))
@@ -209,13 +211,12 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     });
 
     // Reset the schema back to it's original state.
-    deleteSchema(getTableName());
     schema = createSchema();
     schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.INT);
     schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.LONG);
     
schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.INT);
     
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.INT);
-    addSchema(schema);
+    forceUpdateSchema(schema);
   }
 
   @Test(priority = 3)
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
index 2bc366a371..bcc9a0883d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -81,7 +81,6 @@ public class RefreshSegmentTaskGenerator extends 
BaseTaskGenerator {
     String tableNameWithType = tableConfig.getTableName();
     Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for 
Table: %s", tableNameWithType);
 
-
     String taskType = RefreshSegmentTask.TASK_TYPE;
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
     PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
@@ -102,7 +101,8 @@ public class RefreshSegmentTaskGenerator extends 
BaseTaskGenerator {
 
     // Get info about table and schema.
     Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
-    Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+    Schema schema = 
pinotHelixResourceManager.getTableSchema(tableNameWithType);
+    Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", tableNameWithType);
     Stat schemaStat = 
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
 
     // Get the running segments for a table.
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 754f7224a2..a4874a1cc9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -43,7 +43,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -525,8 +524,6 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
         .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
         .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
 
-    
when(mockPinotHelixResourceManager.getSchemaForTableConfig(Mockito.any())).thenReturn(schema);
-
     RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new 
RealtimeToOfflineSegmentsTaskGenerator();
     taskGenerator.init(mockClusterInfoAccessor);
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index bef2d7fa84..7f60004fdb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -308,7 +308,7 @@ public interface TableDataManager {
   /**
    * Constructs the index loading config for the table with the given table 
config and schema.
    */
-  IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, @Nullable 
Schema schema);
+  IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema 
schema);
 
   /**
    * Interface to handle segment state transitions from CONSUMING to DROPPED
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 5bf6ef8bc8..d28225c923 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -90,7 +89,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,7 +135,7 @@ public final class TableConfigUtils {
   /**
    * @see TableConfigUtils#validate(TableConfig, Schema, String)
    */
-  public static void validate(TableConfig tableConfig, @Nullable Schema 
schema) {
+  public static void validate(TableConfig tableConfig, Schema schema) {
     validate(tableConfig, schema, null);
   }
 
@@ -152,17 +150,14 @@ public final class TableConfigUtils {
    *
    * TODO: Add more validations for each section (e.g. validate conditions are 
met for aggregateMetrics)
    */
-  public static void validate(TableConfig tableConfig, @Nullable Schema 
schema, @Nullable String typesToSkip) {
+  public static void validate(TableConfig tableConfig, Schema schema, 
@Nullable String typesToSkip) {
+    Preconditions.checkArgument(schema != null, "Schema should not be null");
     Set<ValidationType> skipTypes = parseTypesToSkipString(typesToSkip);
-    if (tableConfig.getTableType() == TableType.REALTIME) {
-      Preconditions.checkState(schema != null, "Schema should not be null for 
REALTIME table");
-    }
     // Sanitize the table config before validation
     sanitize(tableConfig);
 
     // skip all validation if skip type ALL is selected.
     if (!skipTypes.contains(ValidationType.ALL)) {
-      validateTableSchemaConfig(tableConfig);
       validateValidationConfig(tableConfig, schema);
       validateIngestionConfig(tableConfig, schema);
 
@@ -205,7 +200,7 @@ public final class TableConfigUtils {
    * @param tableConfig Table config to validate
    * @return true if the table config is using instance pool and replica group 
configuration, false otherwise
    */
-  static boolean isTableUsingInstancePoolAndReplicaGroup(@Nonnull TableConfig 
tableConfig) {
+  static boolean isTableUsingInstancePoolAndReplicaGroup(TableConfig 
tableConfig) {
     boolean status = true;
     Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = 
tableConfig.getInstanceAssignmentConfigMap();
     if (instanceAssignmentConfigMap != null) {
@@ -253,19 +248,6 @@ public final class TableConfigUtils {
     }
   }
 
-  /**
-   * Validates the table name with the following rule:
-   * - Schema name should either be null or match the raw table name
-   */
-  private static void validateTableSchemaConfig(TableConfig tableConfig) {
-    // Ensure that table is not created if schema is not present
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
-    String schemaName = tableConfig.getValidationConfig().getSchemaName();
-    if (schemaName != null && !schemaName.equals(rawTableName)) {
-      throw new IllegalStateException("Schema name: " + schemaName + " does 
not match table name: " + rawTableName);
-    }
-  }
-
   /**
    * Validates retention config. Checks for following things:
    * - Valid segmentPushType
@@ -314,7 +296,7 @@ public final class TableConfigUtils {
    * 3. Checks peerDownloadSchema
    * 4. Checks time column existence if null handling for time column is 
enabled
    */
-  private static void validateValidationConfig(TableConfig tableConfig, 
@Nullable Schema schema) {
+  private static void validateValidationConfig(TableConfig tableConfig, Schema 
schema) {
     SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
     String timeColumnName = validationConfig.getTimeColumnName();
     if (tableConfig.getTableType() == TableType.REALTIME) {
@@ -322,7 +304,7 @@ public final class TableConfigUtils {
       Preconditions.checkState(timeColumnName != null, "'timeColumnName' 
cannot be null in REALTIME table config");
     }
     // timeColumnName can be null in OFFLINE table
-    if (timeColumnName != null && !timeColumnName.isEmpty() && schema != null) 
{
+    if (timeColumnName != null) {
       Preconditions.checkState(schema.getSpecForTimeColumn(timeColumnName) != 
null,
           "Cannot find valid fieldSpec for timeColumn: %s from the table 
config: %s, in the schema: %s", timeColumnName,
           tableConfig.getTableName(), schema.getSchemaName());
@@ -330,8 +312,8 @@ public final class TableConfigUtils {
     if (tableConfig.isDimTable()) {
       Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE,
           "Dimension table must be of OFFLINE table type.");
-      Preconditions.checkState(schema != null, "Dimension table must have an 
associated schema");
-      Preconditions.checkState(!schema.getPrimaryKeyColumns().isEmpty(), 
"Dimension table must have primary key[s]");
+      
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
+          "Dimension table must have primary key[s]");
     }
 
     String peerSegmentDownloadScheme = 
validationConfig.getPeerSegmentDownloadScheme();
@@ -360,7 +342,7 @@ public final class TableConfigUtils {
    * 6. ingestion type for dimension tables
    */
   @VisibleForTesting
-  public static void validateIngestionConfig(TableConfig tableConfig, 
@Nullable Schema schema) {
+  public static void validateIngestionConfig(TableConfig tableConfig, Schema 
schema) {
     IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
 
     if (ingestionConfig != null) {
@@ -395,7 +377,7 @@ public final class TableConfigUtils {
         Preconditions.checkState(indexingConfig == null || 
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
             "Should not use indexingConfig#getStreamConfigs if 
ingestionConfig#StreamIngestionConfig is provided");
         List<Map<String, String>> streamConfigMaps = 
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
-        Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at 
least 1 stream in REALTIME table");
+        Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at 
least 1 stream in REALTIME table");
         // TODO: for multiple stream configs, validate them
       }
 
@@ -431,14 +413,11 @@ public final class TableConfigUtils {
                 "columnName/aggregationFunction cannot be null in 
AggregationConfig " + aggregationConfig);
           }
 
-          FieldSpec fieldSpec = null;
-          if (schema != null) {
-            fieldSpec = schema.getFieldSpecFor(columnName);
-            Preconditions.checkState(fieldSpec != null, "The destination 
column '" + columnName
-                + "' of the aggregation function must be present in the 
schema");
-            Preconditions.checkState(fieldSpec.getFieldType() == 
FieldSpec.FieldType.METRIC,
-                "The destination column '" + columnName + "' of the 
aggregation function must be a metric column");
-          }
+          FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+          Preconditions.checkState(fieldSpec != null,
+              "The destination column '" + columnName + "' of the aggregation 
function must be present in the schema");
+          Preconditions.checkState(fieldSpec.getFieldType() == 
FieldSpec.FieldType.METRIC,
+              "The destination column '" + columnName + "' of the aggregation 
function must be a metric column");
 
           if (!aggregationColumns.add(columnName)) {
             throw new IllegalStateException("Duplicate aggregation config 
found for column '" + columnName + "'");
@@ -471,11 +450,9 @@ public final class TableConfigUtils {
               Preconditions.checkState(StringUtils.isNumeric(literal),
                   "Second argument of DISTINCT_COUNT_HLL must be a number: 
%s", aggregationConfig);
             }
-            if (fieldSpec != null) {
-              DataType dataType = fieldSpec.getDataType();
-              Preconditions.checkState(dataType == DataType.BYTES,
-                  "Result type for DISTINCT_COUNT_HLL must be BYTES: %s", 
aggregationConfig);
-            }
+            DataType dataType = fieldSpec.getDataType();
+            Preconditions.checkState(dataType == DataType.BYTES, "Result type 
for DISTINCT_COUNT_HLL must be BYTES: %s",
+                aggregationConfig);
           } else if (functionType == DISTINCTCOUNTHLLPLUS) {
             Preconditions.checkState(numArguments >= 1 && numArguments <= 3,
                 "DISTINCT_COUNT_HLL_PLUS can have at most three arguments: 
%s", aggregationConfig);
@@ -525,10 +502,8 @@ public final class TableConfigUtils {
 
           aggregationSourceColumns.add(firstArgument.getIdentifier());
         }
-        if (schema != null) {
-          Preconditions.checkState(new 
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
-              "all metric columns must be aggregated");
-        }
+        Preconditions.checkState(new 
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
+            "all metric columns must be aggregated");
 
         // This is required by 
MutableSegmentImpl.enableMetricsAggregationIfPossible().
         // That code will disable ingestion aggregation if all metrics aren't 
noDictionaryColumns.
@@ -565,13 +540,10 @@ public final class TableConfigUtils {
           if (!transformColumns.add(columnName)) {
             throw new IllegalStateException("Duplicate transform config found 
for column '" + columnName + "'");
           }
-          if (schema != null) {
-            Preconditions.checkState(
-                schema.getFieldSpecFor(columnName) != null || 
aggregationSourceColumns.contains(columnName),
-                "The destination column '" + columnName
-                    + "' of the transform function must be present in the 
schema or as a source column for "
-                    + "aggregations");
-          }
+          Preconditions.checkState(schema.hasColumn(columnName) || 
aggregationSourceColumns.contains(columnName),
+              "The destination column '" + columnName
+                  + "' of the transform function must be present in the schema 
or as a source column for "
+                  + "aggregations");
           FunctionEvaluator expressionEvaluator;
           if (_disableGroovy && 
FunctionEvaluatorFactory.isGroovyExpression(transformFunction)) {
             throw new IllegalStateException(
@@ -595,7 +567,7 @@ public final class TableConfigUtils {
 
       // Complex configs
       ComplexTypeConfig complexTypeConfig = 
ingestionConfig.getComplexTypeConfig();
-      if (complexTypeConfig != null && schema != null) {
+      if (complexTypeConfig != null) {
         Map<String, String> prefixesToRename = 
complexTypeConfig.getPrefixesToRename();
         if (MapUtils.isNotEmpty(prefixesToRename)) {
           Set<String> fieldNames = schema.getColumnNames();
@@ -611,7 +583,7 @@ public final class TableConfigUtils {
 
       SchemaConformingTransformerConfig schemaConformingTransformerConfig =
           ingestionConfig.getSchemaConformingTransformerConfig();
-      if (null != schemaConformingTransformerConfig && null != schema) {
+      if (schemaConformingTransformerConfig != null) {
         SchemaConformingTransformer.validateSchema(schema, 
schemaConformingTransformerConfig);
       }
     }
@@ -983,10 +955,7 @@ public final class TableConfigUtils {
    * Also ensures proper dependency between index types (eg: Inverted Index 
columns
    * cannot be present in no-dictionary columns).
    */
-  private static void validateIndexingConfig(IndexingConfig indexingConfig, 
@Nullable Schema schema) {
-    if (schema == null) {
-      return;
-    }
+  private static void validateIndexingConfig(IndexingConfig indexingConfig, 
Schema schema) {
     ArrayListMultimap<String, String> columnNameToConfigMap = 
ArrayListMultimap.create();
     Set<String> noDictionaryColumnsSet = new HashSet<>();
 
@@ -1189,7 +1158,7 @@ public final class TableConfigUtils {
    * Additional checks for TEXT and FST index types
    * Validates index compatibility for forward index disabled columns
    */
-  private static void validateFieldConfigList(TableConfig tableConfig, 
@Nullable Schema schema) {
+  private static void validateFieldConfigList(TableConfig tableConfig, Schema 
schema) {
     List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
     TableType tableType = tableConfig.getTableType();
@@ -1200,6 +1169,9 @@ public final class TableConfigUtils {
 
     for (FieldConfig fieldConfig : fieldConfigList) {
       String columnName = fieldConfig.getName();
+      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+      Preconditions.checkState(fieldSpec != null,
+          "Column: %s defined in field config list must be a valid column 
defined in the schema", columnName);
       EncodingType encodingType = fieldConfig.getEncodingType();
       Preconditions.checkArgument(encodingType != null, "Encoding type must be 
specified for column: %s", columnName);
       CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
@@ -1211,10 +1183,8 @@ public final class TableConfigUtils {
               "Compression codec: %s is not applicable to raw index",
               compressionCodec);
           if ((compressionCodec == CompressionCodec.CLP || compressionCodec == 
CompressionCodec.CLPV2
-              || compressionCodec == CompressionCodec.CLPV2_ZSTD || 
compressionCodec == CompressionCodec.CLPV2_LZ4)
-              && schema != null) {
-            Preconditions.checkArgument(
-                
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == 
DataType.STRING,
+              || compressionCodec == CompressionCodec.CLPV2_ZSTD || 
compressionCodec == CompressionCodec.CLPV2_LZ4)) {
+            
Preconditions.checkArgument(fieldSpec.getDataType().getStoredType() == 
DataType.STRING,
                 "CLP compression codec can only be applied to string columns");
           }
           break;
@@ -1232,13 +1202,6 @@ public final class TableConfigUtils {
           break;
       }
 
-      if (schema == null) {
-        return;
-      }
-      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
-      Preconditions.checkState(fieldSpec != null,
-          "Column: %s defined in field config list must be a valid column 
defined in the schema", columnName);
-
       // Validate the forward index disabled compatibility with other indexes 
if enabled for this column
       validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, 
indexingConfig, schema, tableType);
 
@@ -1493,7 +1456,7 @@ public final class TableConfigUtils {
     return false;
   }
 
-  private static boolean isRoutingStrategyAllowedForUpsert(@Nonnull 
RoutingConfig routingConfig) {
+  private static boolean isRoutingStrategyAllowedForUpsert(RoutingConfig 
routingConfig) {
     String instanceSelectorType = routingConfig.getInstanceSelectorType();
     return UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES.stream().anyMatch(x -> 
x.equalsIgnoreCase(instanceSelectorType));
   }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b5ca43e4ba..b3bf9d3e1a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -91,7 +91,7 @@ public class TableConfigUtilsTest {
     try {
       TableConfigUtils.validate(tableConfig, null);
       Assert.fail("Should fail for null timeColumnName and null schema in 
REALTIME table");
-    } catch (IllegalStateException e) {
+    } catch (IllegalArgumentException e) {
       // expected
     }
 
@@ -101,7 +101,7 @@ public class TableConfigUtilsTest {
     try {
       TableConfigUtils.validate(tableConfig, null);
       Assert.fail("Should fail for null schema in REALTIME table");
-    } catch (IllegalStateException e) {
+    } catch (IllegalArgumentException e) {
       // expected
     }
 
@@ -149,12 +149,22 @@ public class TableConfigUtilsTest {
     // OFFLINE table
     // null timeColumnName and schema - allowed in OFFLINE
     tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
-    TableConfigUtils.validate(tableConfig, null);
+    try {
+      TableConfigUtils.validate(tableConfig, null);
+      Assert.fail("Should fail for null timeColumnName and null schema in 
OFFLINE table");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
 
     // null schema only - allowed in OFFLINE
     tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
-    TableConfigUtils.validate(tableConfig, null);
+    try {
+      TableConfigUtils.validate(tableConfig, null);
+      Assert.fail("Should fail for null schema in OFFLINE table");
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
 
     // null timeColumnName only - allowed in OFFLINE
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
@@ -172,7 +182,7 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    // non-null schema nd timeColumnName, but timeColumnName not present as a 
time spec in schema
+    // non-null schema and timeColumnName, but timeColumnName not present as a 
time spec in schema
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addSingleValueDimension(TIME_COLUMN, 
FieldSpec.DataType.STRING).build();
     tableConfig =
@@ -184,11 +194,6 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    // empty timeColumnName - valid
-    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build();
-    TableConfigUtils.validate(tableConfig, schema);
-
     // valid
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
@@ -217,7 +222,7 @@ public class TableConfigUtilsTest {
     try {
       TableConfigUtils.validate(tableConfig, null);
       Assert.fail("Should fail with a Dimension table without a schema");
-    } catch (IllegalStateException e) {
+    } catch (IllegalArgumentException e) {
       // expected
     }
 
@@ -327,7 +332,7 @@ public class TableConfigUtilsTest {
     // invalid transform config since Groovy is disabled
     try {
       TableConfigUtils.setDisableGroovy(true);
-      TableConfigUtils.validate(tableConfig, schema, null);
+      TableConfigUtils.validate(tableConfig, schema);
       // Reset to false
       TableConfigUtils.setDisableGroovy(false);
       Assert.fail("Should fail when Groovy functions disabled but found in 
transform config");
@@ -363,7 +368,7 @@ public class TableConfigUtilsTest {
     ingestionConfig.setFilterConfig(new FilterConfig("Groovy({timestamp > 0}, 
timestamp)"));
     try {
       TableConfigUtils.setDisableGroovy(true);
-      TableConfigUtils.validate(tableConfig, schema, null);
+      TableConfigUtils.validate(tableConfig, schema);
       // Reset to false
       TableConfigUtils.setDisableGroovy(false);
       Assert.fail("Should fail when Groovy functions disabled but found in 
filter config");
@@ -677,23 +682,27 @@ public class TableConfigUtilsTest {
 
   @Test
   public void ingestionStreamConfigsTest() {
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
     Map<String, String> streamConfigs = getStreamConfigs();
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)));
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setTimeColumnName("timeColumn")
+        .setIngestionConfig(ingestionConfig)
+        .build();
 
     // Multiple stream configs are allowed
     try {
-      TableConfigUtils.validateIngestionConfig(tableConfig, null);
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
     } catch (IllegalStateException e) {
       Assert.fail("Multiple stream configs should be supported");
     }
 
     // stream config should be valid
     ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Collections.singletonList(streamConfigs)));
-    TableConfigUtils.validateIngestionConfig(tableConfig, null);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
 
     // validate the proto decoder
     streamConfigs = getKafkaStreamConfigs();
@@ -759,6 +768,8 @@ public class TableConfigUtilsTest {
 
   @Test
   public void ingestionBatchConfigsTest() {
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+
     Map<String, String> batchConfigMap = new HashMap<>();
     batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
     batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
@@ -773,11 +784,14 @@ public class TableConfigUtilsTest {
         new BatchIngestionConfig(Arrays.asList(batchConfigMap, 
batchConfigMap), null, null));
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build();
-    TableConfigUtils.validateIngestionConfig(tableConfig, null);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
   }
 
   @Test
   public void ingestionConfigForDimensionTableTest() {
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(List.of("pk")).build();
+
     Map<String, String> batchConfigMap = new HashMap<>();
     batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
     batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
@@ -792,12 +806,12 @@ public class TableConfigUtilsTest {
         new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"REFRESH", null));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
         .setIngestionConfig(ingestionConfig).build();
-    TableConfigUtils.validateIngestionConfig(tableConfig, null);
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
 
     // dimension tables should have batch ingestion config
     ingestionConfig.setBatchIngestionConfig(null);
     try {
-      TableConfigUtils.validateIngestionConfig(tableConfig, null);
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail for Dimension table without batch ingestion 
config");
     } catch (IllegalStateException e) {
       // expected
@@ -807,7 +821,7 @@ public class TableConfigUtilsTest {
     ingestionConfig.setBatchIngestionConfig(
         new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"APPEND", null));
     try {
-      TableConfigUtils.validateIngestionConfig(tableConfig, null);
+      TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail for Dimension table with ingestion type APPEND 
(should be REFRESH)");
     } catch (IllegalStateException e) {
       // expected
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
index 6df20fe491..c504af4a65 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
@@ -154,26 +154,24 @@ public class PredownloadZKClient implements AutoCloseable 
{
    * @param tableInfoMap               Map of table name to table info to be 
filled with table config.
    */
   public void updateSegmentMetadata(List<PredownloadSegmentInfo> 
predownloadSegmentInfoList,
-      Map<String, PredownloadTableInfo> tableInfoMap,
-      InstanceDataManagerConfig instanceDataManagerConfig) {
+      Map<String, PredownloadTableInfo> tableInfoMap, 
InstanceDataManagerConfig instanceDataManagerConfig) {
     // fallback path comes from ZKHelixManager.class getHelixPropertyStore 
method
     ZkHelixPropertyStore<ZNRecord> propertyStore = new 
AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient),
         PropertyPathBuilder.propertyStore(_clusterName), 
String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE"));
     for (PredownloadSegmentInfo predownloadSegmentInfo : 
predownloadSegmentInfoList) {
-      
tableInfoMap.computeIfAbsent(predownloadSegmentInfo.getTableNameWithType(), 
name -> {
-        TableConfig tableConfig =
-            ZKMetadataProvider.getTableConfig(propertyStore, 
predownloadSegmentInfo.getTableNameWithType());
+      String tableNameWithType = predownloadSegmentInfo.getTableNameWithType();
+      tableInfoMap.computeIfAbsent(tableNameWithType, name -> {
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType);
         if (tableConfig == null) {
           LOGGER.warn("Cannot predownload segment {} because not able to get 
its table config from ZK",
               predownloadSegmentInfo.getSegmentName());
           return null;
         }
-        Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableConfig);
+        Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, 
tableNameWithType);
         return new PredownloadTableInfo(name, tableConfig, schema, 
instanceDataManagerConfig);
       });
-      SegmentZKMetadata segmentZKMetadata =
-          ZKMetadataProvider.getSegmentZKMetadata(propertyStore, 
predownloadSegmentInfo.getTableNameWithType(),
-              predownloadSegmentInfo.getSegmentName());
+      SegmentZKMetadata segmentZKMetadata = 
ZKMetadataProvider.getSegmentZKMetadata(propertyStore, tableNameWithType,
+          predownloadSegmentInfo.getSegmentName());
       if (segmentZKMetadata == null) {
         LOGGER.warn("Cannot predownload segment {} because not able to get its 
metadata from ZK",
             predownloadSegmentInfo.getSegmentName());
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 2bfdc051a6..536128b7c4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -38,8 +38,6 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   private String _replication;
   @Deprecated // Use _replication instead
   private String _replicasPerPartition;
-  @Deprecated // Schema name should be the same as raw table name
-  private String _schemaName;
   private String _timeColumnName;
   private TimeUnit _timeType;
   @Deprecated  // Use SegmentAssignmentConfig instead
@@ -170,19 +168,6 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
     _replicasPerPartition = replicasPerPartition;
   }
 
-  /**
-   * @deprecated Schema name should be the same as raw table name
-   */
-  @Deprecated
-  public String getSchemaName() {
-    return _schemaName;
-  }
-
-  @Deprecated
-  public void setSchemaName(String schemaName) {
-    _schemaName = schemaName;
-  }
-
   /**
    * @deprecated Use {@link InstanceAssignmentConfig} instead.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to