This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ed179  Validate timeColumnName when adding/updating 
schema/tableConfig (#5966)
e0ed179 is described below

commit e0ed179c249a4affbbf0491e4140d1c9fd5e9b20
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Thu Sep 10 14:32:31 2020 -0700

    Validate timeColumnName when adding/updating schema/tableConfig (#5966)
---
 .../api/resources/PinotSchemaRestletResource.java  |  22 +-
 .../api/resources/PinotTableIndexingConfigs.java   |   4 +-
 .../api/resources/PinotTableMetadataConfigs.java   |   4 +-
 .../api/resources/PinotTableRestletResource.java   |  10 +-
 .../api/resources/PinotTableSegmentConfigs.java    |   4 +-
 .../helix/core/PinotHelixResourceManager.java      |  37 +++
 .../pinot/controller/helix/ControllerTest.java     |   4 +-
 .../org/apache/pinot/core/util/SchemaUtils.java    |  30 +++
 .../apache/pinot/core/util/TableConfigUtils.java   |  64 +++--
 .../apache/pinot/core/util/SchemaUtilsTest.java    | 158 +++++++++++--
 .../pinot/core/util/TableConfigUtilsTest.java      | 263 ++++++++++++++++-----
 11 files changed, 482 insertions(+), 118 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
index f2bf6c0..fd902ef 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
@@ -171,7 +171,8 @@ public class PinotSchemaRestletResource {
   public String validateSchema(FormDataMultiPart multiPart) {
     Schema schema = getSchemaFromMultiPart(multiPart);
     try {
-      SchemaUtils.validate(schema);
+      List<TableConfig> tableConfigs = 
_pinotHelixResourceManager.getTableConfigsForSchema(schema.getSchemaName());
+      SchemaUtils.validate(schema, tableConfigs);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER,
           "Invalid schema: " + schema.getSchemaName() + ". Reason: " + 
e.getMessage(), Response.Status.BAD_REQUEST, e);
@@ -188,7 +189,8 @@ public class PinotSchemaRestletResource {
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Successfully 
validated schema"), @ApiResponse(code = 400, message = "Missing or invalid 
request body"), @ApiResponse(code = 500, message = "Internal error")})
   public String validateSchema(Schema schema) {
     try {
-      SchemaUtils.validate(schema);
+      List<TableConfig> tableConfigs = 
_pinotHelixResourceManager.getTableConfigsForSchema(schema.getSchemaName());
+      SchemaUtils.validate(schema, tableConfigs);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER,
           "Invalid schema: " + schema.getSchemaName() + ". Reason: " + 
e.getMessage(), Response.Status.BAD_REQUEST, e);
@@ -202,25 +204,26 @@ public class PinotSchemaRestletResource {
    * @param override  set to true to override the existing schema with the 
same name
    */
   private SuccessResponse addSchema(Schema schema, boolean override) {
+    String schemaName = schema.getSchemaName();
     try {
-      SchemaUtils.validate(schema);
+      List<TableConfig> tableConfigs = 
_pinotHelixResourceManager.getTableConfigsForSchema(schemaName);
+      SchemaUtils.validate(schema, tableConfigs);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER,
-          "Cannot add invalid schema: " + schema.getSchemaName() + ". Reason: 
" + e.getMessage(),
-          Response.Status.BAD_REQUEST, e);
+          "Cannot add invalid schema: " + schemaName + ". Reason: " + 
e.getMessage(), Response.Status.BAD_REQUEST, e);
     }
 
     try {
       _pinotHelixResourceManager.addSchema(schema, override);
       // Best effort notification. If controller fails at this point, no 
notification is given.
-      LOGGER.info("Notifying metadata event for adding new schema {}", 
schema.getSchemaName());
+      LOGGER.info("Notifying metadata event for adding new schema {}", 
schemaName);
       _metadataEventNotifierFactory.create().notifyOnSchemaEvents(schema, 
SchemaEventType.CREATE);
 
-      return new SuccessResponse(schema.getSchemaName() + " successfully 
added");
+      return new SuccessResponse(schemaName + " successfully added");
     } catch (Exception e) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SCHEMA_UPLOAD_ERROR,
 1L);
       throw new ControllerApplicationException(LOGGER,
-          String.format("Failed to add new schema %s.", 
schema.getSchemaName()), Response.Status.INTERNAL_SERVER_ERROR,
+          String.format("Failed to add new schema %s.", schemaName), 
Response.Status.INTERNAL_SERVER_ERROR,
           e);
     }
   }
@@ -234,7 +237,8 @@ public class PinotSchemaRestletResource {
    */
   private SuccessResponse updateSchema(String schemaName, Schema schema, 
boolean reload) {
     try {
-      SchemaUtils.validate(schema);
+      List<TableConfig> tableConfigs = 
_pinotHelixResourceManager.getTableConfigsForSchema(schemaName);
+      SchemaUtils.validate(schema, tableConfigs);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER,
           "Cannot add invalid schema: " + schemaName + ". Reason: " + 
e.getMessage(),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableIndexingConfigs.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableIndexingConfigs.java
index 7311716..89ebab2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableIndexingConfigs.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableIndexingConfigs.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Response;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.util.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,8 @@ public class PinotTableIndexingConfigs {
     TableConfig tableConfig;
     try {
       tableConfig = JsonUtils.stringToObject(tableConfigString, 
TableConfig.class);
-      TableConfigUtils.validate(tableConfig);
+      Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Invalid table config", 
Response.Status.BAD_REQUEST, e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableMetadataConfigs.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableMetadataConfigs.java
index 7e03026..2099c5e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableMetadataConfigs.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableMetadataConfigs.java
@@ -32,6 +32,7 @@ import javax.ws.rs.core.Response;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.util.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +56,8 @@ public class PinotTableMetadataConfigs {
     TableConfig tableConfig;
     try {
       tableConfig = JsonUtils.stringToObject(tableConfigString, 
TableConfig.class);
-      TableConfigUtils.validate(tableConfig);
+      Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Invalid table config", 
Response.Status.BAD_REQUEST, e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index a4581ed..7060d00 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -58,6 +58,7 @@ import org.apache.pinot.core.util.TableConfigUtils;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.LoggerFactory;
@@ -113,8 +114,9 @@ public class PinotTableRestletResource {
     TableConfig tableConfig;
     try {
       tableConfig = JsonUtils.stringToObject(tableConfigStr, 
TableConfig.class);
+      Schema schema = 
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
       // TableConfigUtils.validate(...) is used across table create/update.
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       // TableConfigUtils.validateTableName(...) checks table name rules.
       // So it won't effect already created tables.
       TableConfigUtils.validateTableName(tableConfig);
@@ -325,7 +327,8 @@ public class PinotTableRestletResource {
     TableConfig tableConfig;
     try {
       tableConfig = JsonUtils.stringToObject(tableConfigString, 
TableConfig.class);
-      TableConfigUtils.validate(tableConfig);
+      Schema schema = 
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Invalid table config", 
Response.Status.BAD_REQUEST, e);
     }
@@ -368,7 +371,8 @@ public class PinotTableRestletResource {
   public String checkTableConfig(String tableConfigStr) {
     try {
       TableConfig tableConfig = JsonUtils.stringToObject(tableConfigStr, 
TableConfig.class);
-      TableConfigUtils.validate(tableConfig);
+      Schema schema = 
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
       if (tableConfig.getTableType() == TableType.OFFLINE) {
         tableConfigValidateStr.set(TableType.OFFLINE.name(), 
tableConfig.toJsonNode());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableSegmentConfigs.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableSegmentConfigs.java
index fe58213..0daaee8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableSegmentConfigs.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableSegmentConfigs.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.util.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +61,8 @@ public class PinotTableSegmentConfigs {
     TableConfig tableConfig;
     try {
       tableConfig = JsonUtils.stringToObject(tableConfigString, 
TableConfig.class);
-      TableConfigUtils.validate(tableConfig);
+      Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, "Invalid table config", 
Response.Status.BAD_REQUEST, e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1536128..2bff629 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1067,6 +1067,22 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getTableSchema(_propertyStore, tableName);
   }
 
+  /**
+   * Find schema with same name as rawTableName. If not found, find schema 
using schemaName in validationConfig.
+   * For OFFLINE table, it is possible that schema was not uploaded before 
creating the table. Hence for OFFLINE, this method can return null.
+   */
+  @Nullable
+  public Schema getSchemaForTableConfig(TableConfig tableConfig) {
+    Schema schema = 
getSchema(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
+    if (schema == null) {
+      String schemaName = tableConfig.getValidationConfig().getSchemaName();
+      if (schemaName != null) {
+        schema = getSchema(schemaName);
+      }
+    }
+    return schema;
+  }
+
   public List<String> getSchemaNames() {
     return _propertyStore
         
.getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(),
@@ -1989,6 +2005,27 @@ public class PinotHelixResourceManager {
     }
   }
 
+  /**
+   * Get all tableConfigs (offline and realtime) using this schema.
+   * If tables have not been created, this will return empty list.
+   * If table config raw name doesn't match schema, they will not be fetched.
+   *
+   * @param schemaName Schema name
+   * @return list of table configs using this schema.
+   */
+  public List<TableConfig> getTableConfigsForSchema(String schemaName) {
+    List<TableConfig> tableConfigs = new ArrayList<>();
+    TableConfig offlineTableConfig = getOfflineTableConfig(schemaName);
+    if (offlineTableConfig != null) {
+      tableConfigs.add(offlineTableConfig);
+    }
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(schemaName);
+    if (realtimeTableConfig != null) {
+      tableConfigs.add(realtimeTableConfig);
+    }
+    return tableConfigs;
+  }
+
   public List<String> getServerInstancesForTable(String tableName, TableType 
tableType) {
     TableConfig tableConfig = getTableConfig(tableName, tableType);
     Preconditions.checkNotNull(tableConfig);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 2dd98d3..764cf53 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -79,6 +79,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -411,10 +412,9 @@ public abstract class ControllerTest {
     schema.setSchemaName(tableName);
     schema.addField(new DimensionFieldSpec("dimA", FieldSpec.DataType.STRING, 
true, ""));
     schema.addField(new DimensionFieldSpec("dimB", FieldSpec.DataType.STRING, 
true, 0));
-
     schema.addField(new MetricFieldSpec("metricA", FieldSpec.DataType.INT, 0));
     schema.addField(new MetricFieldSpec("metricB", FieldSpec.DataType.DOUBLE, 
-1));
-
+    schema.addField(new DateTimeFieldSpec("timeColumn", 
FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:DAYS"));
     return schema;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
index 6691119..20bb089 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.pinot.core.data.function.FunctionEvaluator;
 import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.DateTimeGranularitySpec;
@@ -43,6 +44,22 @@ public class SchemaUtils {
   public static final String MAP_VALUE_COLUMN_SUFFIX = "__VALUES";
 
   /**
+   * Validates the schema.
+   * First checks that the schema is compatible with any provided table 
configs associated with it.
+   * This check is useful to ensure schema and table are compatible, in the 
event that schema is updated or added after the table config
+   * Then validates the schema using {@link SchemaUtils#validate(Schema 
schema)}
+   *
+   * @param schema schema to validate
+   * @param tableConfigs table configs associated with this schema (table 
configs with raw name = schema name)
+   */
+  public static void validate(Schema schema, List<TableConfig> tableConfigs) {
+    for (TableConfig tableConfig : tableConfigs) {
+      validateCompatibilityWithTableConfig(schema, tableConfig);
+    }
+    validate(schema);
+  }
+
+  /**
    * Validates the following:
    * 1) Checks valid transform function -
    *   for a field spec with transform function, the source column name and 
destination column name are exclusive i.e. do not allow using source column 
name for destination column
@@ -90,6 +107,19 @@ public class SchemaUtils {
   }
 
   /**
+   * Validates that the schema is compatible with the given table config
+   */
+  private static void validateCompatibilityWithTableConfig(Schema schema, 
TableConfig tableConfig) {
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Schema is incompatible with tableConfig with name: " + 
tableConfig.getTableName() + " and type: "
+              + tableConfig.getTableType(), e);
+    }
+  }
+
+  /**
    * Checks for valid incoming and outgoing granularity spec in the time field 
spec
    */
   private static void validateTimeFieldSpec(FieldSpec fieldSpec) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
index a0f4278..dfe93a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.TimeUtils;
 
 
@@ -54,10 +55,15 @@ public final class TableConfigUtils {
    * 1. Validation config
    * 2. IngestionConfig
    * 3. TierConfigs
+   *
+   * TODO: Add more validations for each section (e.g. verify column names 
used in the indexing, validate conditions are met for aggregateMetrics etc)
    */
-  public static void validate(TableConfig tableConfig) {
-    validateValidationConfig(tableConfig);
-    validateIngestionConfig(tableConfig.getIngestionConfig());
+  public static void validate(TableConfig tableConfig, @Nullable Schema 
schema) {
+    if (tableConfig.getTableType() == TableType.REALTIME) {
+      Preconditions.checkState(schema != null, "Schema should not be null for 
REALTIME table");
+    }
+    validateValidationConfig(tableConfig, schema);
+    validateIngestionConfig(tableConfig.getIngestionConfig(), schema);
     validateTierConfigList(tableConfig.getTierConfigsList());
   }
 
@@ -74,19 +80,37 @@ public final class TableConfigUtils {
     }
   }
 
-  private static void validateValidationConfig(TableConfig tableConfig) {
+  /**
+   * Validates the following in the validationConfig of the table
+   * 1. For REALTIME table
+   * - checks for non-null timeColumnName
+   * - checks for valid field spec for timeColumnName in schema
+   *
+   * 2. For OFFLINE table
+   * - checks for valid field spec for timeColumnName in schema, if 
timeColumnName and schema re non-null
+   *
+   * 3. Checks peerDownloadSchema
+   */
+  private static void validateValidationConfig(TableConfig tableConfig, 
@Nullable Schema schema) {
     SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
-    if (validationConfig != null) {
-      if (tableConfig.getTableType() == TableType.REALTIME && 
validationConfig.getTimeColumnName() == null) {
-        throw new IllegalStateException("Must provide time column in real-time 
table config");
-      }
-      String peerSegmentDownloadScheme = 
validationConfig.getPeerSegmentDownloadScheme();
-      if (peerSegmentDownloadScheme != null) {
-        if 
(!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)
-            && 
!CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme)) {
-          throw new IllegalStateException("Invalid value '" + 
peerSegmentDownloadScheme
-              + "' for peerSegmentDownloadScheme. Must be one of http nor 
https");
-        }
+    String timeColumnName = validationConfig.getTimeColumnName();
+    if (tableConfig.getTableType() == TableType.REALTIME) {
+      // For REALTIME table, must have a non-null timeColumnName
+      Preconditions.checkState(timeColumnName != null, "'timeColumnName' 
cannot be null in REALTIME table config");
+    }
+    // timeColumnName can be null in OFFLINE table
+    if (timeColumnName != null && schema != null) {
+      Preconditions.checkState(schema.getSpecForTimeColumn(timeColumnName) != 
null,
+          "Cannot find valid fieldSpec for timeColumn: %s from the table 
config, in the schema: %s", timeColumnName,
+          schema.getSchemaName());
+    }
+
+    String peerSegmentDownloadScheme = 
validationConfig.getPeerSegmentDownloadScheme();
+    if (peerSegmentDownloadScheme != null) {
+      if 
(!CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(peerSegmentDownloadScheme) && 
!CommonConstants.HTTPS_PROTOCOL
+          .equalsIgnoreCase(peerSegmentDownloadScheme)) {
+        throw new IllegalStateException("Invalid value '" + 
peerSegmentDownloadScheme
+            + "' for peerSegmentDownloadScheme. Must be one of http or https");
       }
     }
   }
@@ -99,8 +123,10 @@ public final class TableConfigUtils {
    * 4. validity of transform function string
    * 5. checks for source fields used in destination columns
    */
-  private static void validateIngestionConfig(@Nullable IngestionConfig 
ingestionConfig) {
+  private static void validateIngestionConfig(@Nullable IngestionConfig 
ingestionConfig, @Nullable Schema schema) {
     if (ingestionConfig != null) {
+
+      // Filter config
       FilterConfig filterConfig = ingestionConfig.getFilterConfig();
       if (filterConfig != null) {
         String filterFunction = filterConfig.getFilterFunction();
@@ -112,12 +138,18 @@ public final class TableConfigUtils {
           }
         }
       }
+
+      // Transform configs
       List<TransformConfig> transformConfigs = 
ingestionConfig.getTransformConfigs();
       if (transformConfigs != null) {
         Set<String> transformColumns = new HashSet<>();
         Set<String> argumentColumns = new HashSet<>();
         for (TransformConfig transformConfig : transformConfigs) {
           String columnName = transformConfig.getColumnName();
+          if (schema != null) {
+            Preconditions.checkState(schema.getFieldSpecFor(columnName) != 
null,
+                "The destination column of the transform function must be 
present in the schema");
+          }
           String transformFunction = transformConfig.getTransformFunction();
           if (columnName == null || transformFunction == null) {
             throw new IllegalStateException(
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 395cce9..4767e87 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pinot.core.util;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.config.table.IngestionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -35,6 +43,115 @@ import org.testng.annotations.Test;
  */
 public class SchemaUtilsTest {
 
+  private static final String TABLE_NAME = "testTable";
+  private static final String TIME_COLUMN = "timeColumn";
+
+  @Test
+  public void testCompatibilityWithTableConfig() {
+    // empty list
+    List<TableConfig> tableConfigs = new ArrayList<>();
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    SchemaUtils.validate(schema, tableConfigs);
+
+    // offline table
+    // null timeColumnName
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+
+    // schema doesn't have timeColumnName
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+      Assert.fail("Should fail schema validation, as timeColumn is absent");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // schema doesn't have timeColumnName as time spec
+    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(TIME_COLUMN,
 DataType.STRING)
+        .build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+      Assert.fail("Should fail schema validation, as timeColumn is not present 
as time spec");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // schema has timeColumnName
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+
+    // schema doesn't have destination columns from transformConfigs
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
+        new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("colA", "round(colB, 1000)")))).build();
+    try {
+      SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+      Assert.fail("Should fail schema validation, as colA is not present in 
schema");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("colA",
 DataType.STRING).build();
+    SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+
+    // realtime table
+    // schema doesn't have timeColumnName
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+      Assert.fail("Should fail schema validation, as timeColumn is absent");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // schema doesn't have timeColumnName as time spec
+    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(TIME_COLUMN,
 DataType.STRING)
+        .build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+      Assert.fail("Should fail schema validation, as timeColumn is not present 
as time spec");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // schema has timeColumnName
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+
+    // schema doesn't have destination columns from transformConfigs
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+        .setIngestionConfig(
+            new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("colA", "round(colB, 1000)")))).build();
+    try {
+      SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+      Assert.fail("Should fail schema validation, as colA is not present in 
schema");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS")
+        .addSingleValueDimension("colA", DataType.STRING).build();
+    SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
+  }
+
   /**
    * TODO: transform functions have moved to tableConfig#ingestionConfig. 
However, these tests remain to test backward compatibility/
    *  Remove these when we totally stop honoring transform functions in schema
@@ -44,7 +161,7 @@ public class SchemaUtilsTest {
     Schema pinotSchema;
     // source name used as destination name
     pinotSchema = new Schema();
-    DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("dim1", 
FieldSpec.DataType.STRING, true);
+    DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("dim1", 
DataType.STRING, true);
     dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1, 
dim1, argument3)");
     pinotSchema.addField(dimensionFieldSpec);
     try {
@@ -55,26 +172,27 @@ public class SchemaUtilsTest {
     }
 
     pinotSchema = new Schema();
-    MetricFieldSpec metricFieldSpec = new MetricFieldSpec("m1", 
FieldSpec.DataType.LONG);
+    MetricFieldSpec metricFieldSpec = new MetricFieldSpec("m1", DataType.LONG);
     metricFieldSpec.setTransformFunction("Groovy({function}, m1, m1)");
     pinotSchema.addField(metricFieldSpec);
     checkValidationFails(pinotSchema);
 
     pinotSchema = new Schema();
-    DateTimeFieldSpec dateTimeFieldSpec =
-        new DateTimeFieldSpec("dt1", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", 
"1:HOURS");
+    DateTimeFieldSpec dateTimeFieldSpec = new DateTimeFieldSpec("dt1", 
DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
     dateTimeFieldSpec.setTransformFunction("Groovy({function}, m1, dt1)");
     pinotSchema.addField(dateTimeFieldSpec);
     checkValidationFails(pinotSchema);
 
-    pinotSchema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "time"), null).build();
+    pinotSchema =
+        new Schema.SchemaBuilder().addTime(new 
TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "time"), null)
+            .build();
     
pinotSchema.getFieldSpecFor("time").setTransformFunction("Groovy({function}, 
time)");
     checkValidationFails(pinotSchema);
 
     // derived transformations
-    pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("x", 
FieldSpec.DataType.INT)
-        .addSingleValueDimension("z", FieldSpec.DataType.INT).build();
+    pinotSchema =
+        new Schema.SchemaBuilder().addSingleValueDimension("x", 
DataType.INT).addSingleValueDimension("z", DataType.INT)
+            .build();
     pinotSchema.getFieldSpecFor("x").setTransformFunction("Groovy({y + 10}, 
y)");
     pinotSchema.getFieldSpecFor("z").setTransformFunction("Groovy({x*w*20}, x, 
w)");
     checkValidationFails(pinotSchema);
@@ -85,21 +203,21 @@ public class SchemaUtilsTest {
     Schema pinotSchema;
     // time field spec using same name for incoming and outgoing
     pinotSchema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "time"),
-            new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"time")).build();
+        .addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, 
"time"),
+            new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, 
"time")).build();
     checkValidationFails(pinotSchema);
 
     // time field spec using SIMPLE_DATE_FORMAT, not allowed when conversion 
is needed
     pinotSchema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "incoming"),
-            new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
+        .addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, 
"incoming"),
+            new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS,
                 TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(), 
"outgoing")).build();
     checkValidationFails(pinotSchema);
 
     // valid time field spec
     pinotSchema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "incoming"),
-            new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"outgoing")).build();
+        .addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, 
"incoming"),
+            new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, 
"outgoing")).build();
     SchemaUtils.validate(pinotSchema);
   }
 
@@ -109,7 +227,7 @@ public class SchemaUtilsTest {
     // incorrect groovy function syntax
     pinotSchema = new Schema();
 
-    DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("dim1", 
FieldSpec.DataType.STRING, true);
+    DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("dim1", 
DataType.STRING, true);
     dimensionFieldSpec.setTransformFunction("Groovy(function, argument3)");
     pinotSchema.addField(dimensionFieldSpec);
     checkValidationFails(pinotSchema);
@@ -117,15 +235,15 @@ public class SchemaUtilsTest {
     // valid schema, empty arguments
     pinotSchema = new Schema();
 
-    dimensionFieldSpec = new DimensionFieldSpec("dim1", 
FieldSpec.DataType.STRING, true);
+    dimensionFieldSpec = new DimensionFieldSpec("dim1", DataType.STRING, true);
     dimensionFieldSpec.setTransformFunction("Groovy({function})");
     pinotSchema.addField(dimensionFieldSpec);
     SchemaUtils.validate(pinotSchema);
 
     // valid schema
-    pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("dim1", 
FieldSpec.DataType.STRING)
-        .addMetric("m1", FieldSpec.DataType.LONG)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "time"), null).build();
+    pinotSchema =
+        new Schema.SchemaBuilder().addSingleValueDimension("dim1", 
DataType.STRING).addMetric("m1", DataType.LONG)
+            .addTime(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.MILLISECONDS, "time"), null).build();
     
pinotSchema.getFieldSpecFor("dim1").setTransformFunction("Groovy({function}, 
argument1, argument2, argument3)");
     pinotSchema.getFieldSpecFor("m1").setTransformFunction("Groovy({function}, 
m2, m3)");
     
pinotSchema.getFieldSpecFor("time").setTransformFunction("Groovy({function}, 
millis)");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index 0fc251e..0634589 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -27,6 +27,8 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -37,147 +39,276 @@ import org.testng.annotations.Test;
  */
 public class TableConfigUtilsTest {
 
+  private static final String TABLE_NAME = "testTable";
+  private static final String TIME_COLUMN = "timeColumn";
+
+  @Test
+  public void validateTimeColumnValidationConfig() {
+    // REALTIME table
+
+    // null timeColumnName and schema
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
+    try {
+      TableConfigUtils.validate(tableConfig, null);
+      Assert.fail("Should fail for null timeColumnName and null schema in 
REALTIME table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // null schema only
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      TableConfigUtils.validate(tableConfig, null);
+      Assert.fail("Should fail for null schema in REALTIME table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // null timeColumnName only
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).build();
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for null timeColumnName in REALTIME table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // timeColumnName not present in schema
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for timeColumnName not present in schema for 
REALTIME table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // timeColumnName not present as valid time spec schema
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(TIME_COLUMN, FieldSpec.DataType.LONG).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for invalid fieldSpec for timeColumnName in 
schema for REALTIME table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // valid
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    TableConfigUtils.validate(tableConfig, schema);
+
+    // OFFLINE table
+    // null timeColumnName and schema - allowed in OFFLINE
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    TableConfigUtils.validate(tableConfig, null);
+
+    // null schema only - allowed in OFFLINE
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    TableConfigUtils.validate(tableConfig, null);
+
+    // null timeColumnName only - allowed in OFFLINE
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    TableConfigUtils.validate(tableConfig, schema);
+
+    // non-null schema and timeColumnName, but timeColumnName not present in 
schema
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for timeColumnName not present in schema for 
OFFLINE table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // non-null schema nd timeColumnName, but timeColumnName not present as a 
time spec in schema
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(TIME_COLUMN, 
FieldSpec.DataType.STRING).build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for timeColumnName not present in schema for 
OFFLINE table");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // valid
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+    TableConfigUtils.validate(tableConfig, schema);
+  }
+
   @Test
   public void validateIngestionConfig() {
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
     // null ingestion config
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(null).build();
-    TableConfigUtils.validate(tableConfig);
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(null).build();
+    TableConfigUtils.validate(tableConfig, schema);
 
     // null filter config, transform config
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(null, null)).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // null filter function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(new FilterConfig(null), 
null)).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // valid filterFunction
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(new 
FilterConfig("startsWith(columnX, \"myPrefix\")"), null)).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // valid filterFunction
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(new FilterConfig("Groovy({x == 
10}, x)"), null)).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // invalid filter function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(new 
FilterConfig("Groovy(badExpr)"), null)).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail on invalid filter function string");
     } catch (IllegalStateException e) {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(new 
FilterConfig("fakeFunction(xx)"), null)).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid filter function");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // empty transform configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(null, 
Collections.emptyList())).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
+    // transformed column not in schema
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
+        new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")))).build();
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for transformedColumn not present in schema");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
+            .build();
     // valid transform configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")))).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
+    schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
+            .addMetric("transformedCol", FieldSpec.DataType.LONG).build();
     // valid transform configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)"),
             new TransformConfig("transformedCol", "Groovy({x+y}, x, 
y)")))).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // null transform column name
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, Lists.newArrayList(new TransformConfig(null, 
"reverse(anotherCol)")))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null column name in transform config");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // null transform function string
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setIngestionConfig(new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", null)))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null transform function in transform 
config");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // invalid function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", "fakeFunction(col)")))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // invalid function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", "Groovy(badExpr)")))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // input field name used as destination field
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(myCol)")))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to use of myCol as arguments and 
columnName");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // input field name used as destination field
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null,
             Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y + 
myCol}, x, myCol, y)")))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to use of myCol as arguments and 
columnName");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // duplicate transform config
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null,
             Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new 
TransformConfig("myCol", "lower(y)"))))
         .build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to duplicate transform config");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // chained transform functions
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
         new IngestionConfig(null,
             Lists.newArrayList(new TransformConfig("a", "reverse(x)"), new 
TransformConfig("b", "lower(a)")))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to using transformed column 'a' as argument 
for transform function of column 'b'");
     } catch (IllegalStateException e) {
       // expected
@@ -186,138 +317,140 @@ public class TableConfigUtilsTest {
 
   @Test
   public void validateTierConfigs() {
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
     // null tier configs
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(null).build();
-    TableConfigUtils.validate(tableConfig);
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(null).build();
+    TableConfigUtils.validate(tableConfig, schema);
 
     // empty tier configs
     tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Collections.emptyList())
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Collections.emptyList())
             .build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // 1 tier configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
             TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier1_tag_OFFLINE"))).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // 2 tier configs, case insensitive check
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE"),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), 
"tier2_tag_OFFLINE"))).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     //realtime table
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setTimeColumnName("millis")
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
         .setTierConfigList(Lists.newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE.toLowerCase(), 
"tier1_tag_OFFLINE"),
             new TierConfig("tier2", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE.toLowerCase(), "40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier2_tag_OFFLINE"))).build();
-    TableConfigUtils.validate(tableConfig);
+    TableConfigUtils.validate(tableConfig, schema);
 
     // tier name empty
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(
             new TierConfig("", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", 
TierFactory.PINOT_SERVER_STORAGE_TYPE,
                 "tier1_tag_OFFLINE"))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to empty tier name");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // tier name repeats
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("sameTierName", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE"),
             new TierConfig("sameTierName", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "100d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier2_tag_OFFLINE"))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to duplicate tier name");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // segmentSelectorType invalid
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE"),
             new TierConfig("tier2", "unsupportedSegmentSelector", "40d", 
TierFactory.PINOT_SERVER_STORAGE_TYPE,
                 "tier2_tag_OFFLINE"))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to invalid segmentSelectorType");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // segmentAge not provided for TIME segmentSelectorType
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
-        .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, null,
-                TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE"),
-            new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"40d",
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
+        .newArrayList(
+            new TierConfig("tier1", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
null, TierFactory.PINOT_SERVER_STORAGE_TYPE,
+                "tier1_tag_OFFLINE"), new TierConfig("tier2", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier2_tag_OFFLINE"))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to missing segmentAge");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // segmentAge invalid
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE"),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"3600",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier2_tag_OFFLINE"))).build();
 
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to invalid segment age");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // storageType invalid
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", "unsupportedStorageType",
             "tier1_tag_OFFLINE"), new TierConfig("tier2", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "40d",
             TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier2_tag_OFFLINE"))).build();
 
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to invalid storage type");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // serverTag not provided for PINOT_SERVER storageType
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE"),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, null))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to ");
     } catch (IllegalStateException e) {
       // expected
     }
 
     // serverTag invalid
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTierConfigList(Lists
+    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTierConfigList(Lists
         .newArrayList(new TierConfig("tier1", 
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag"),
             new TierConfig("tier2", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, 
"40d",
                 TierFactory.PINOT_SERVER_STORAGE_TYPE, 
"tier2_tag_OFFLINE"))).build();
     try {
-      TableConfigUtils.validate(tableConfig);
+      TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should have failed due to invalid server tag");
     } catch (IllegalStateException e) {
       // expected


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to