This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b12239104 Add configs to logical tables (#15720)
7b12239104 is described below

commit 7b122391046f0a47c7cf69cd2595b9a9fc5cd186
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Fri May 9 19:12:17 2025 +0530

    Add configs to logical tables (#15720)
---
 .../pinot/broker/routing/BrokerRoutingManager.java |  10 +-
 .../pinot/common/config/provider/TableCache.java   |  78 ++---
 .../pinot/common/metadata/ZKMetadataProvider.java  |   8 +-
 ...ableUtils.java => LogicalTableConfigUtils.java} |  81 ++++-
 .../helix/core/PinotHelixResourceManager.java      |  46 ++-
 .../api/PinotTableRestletResourceTest.java         |  30 ++
 .../resources/PinotLogicalTableResourceTest.java   | 329 +++++++++++++++------
 ...inotUserWithAccessLogicalTableResourceTest.java |   1 +
 .../pinot/controller/helix/ControllerTest.java     |  10 +
 .../pinot/controller/helix/TableCacheTest.java     |  58 ++--
 .../apache/pinot/spi/data/LogicalTableConfig.java  |  70 +++--
 .../utils/builder/LogicalTableConfigBuilder.java   |  31 ++
 12 files changed, 545 insertions(+), 207 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 360d05e5e7..ca3d42d436 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -414,15 +414,17 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   }
 
   /**
-   * Builds/rebuilds the routing for the given table.
+   * Builds/rebuilds the routing for the physical table, for logical tables it 
is skipped.
+   * @param physicalOrLogicalTable a physical table with type or logical table 
name
    */
-  public synchronized void buildRouting(String tableNameWithType) {
+  public synchronized void buildRouting(String physicalOrLogicalTable) {
     // skip route building for logical tables
-    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
tableNameWithType)) {
-      LOGGER.info("Skipping route building for logical table: {}", 
tableNameWithType);
+    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
physicalOrLogicalTable)) {
+      LOGGER.info("Skipping route building for logical table: {}", 
physicalOrLogicalTable);
       return;
     }
 
+    String tableNameWithType = physicalOrLogicalTable;
     LOGGER.info("Building routing for table: {}", tableNameWithType);
 
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 0b12a64282..e25948fd61 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -38,7 +38,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.utils.LogicalTableUtils;
+import org.apache.pinot.common.utils.LogicalTableConfigUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
@@ -226,12 +226,17 @@ public class TableCache implements PinotConfigProvider {
   }
 
   /**
-   * Returns the expression override map for the given table, or {@code null} 
if no override is configured.
+   * Returns the expression override map for the given logical or physical 
table, or {@code null} if no override is
+   * configured.
    */
   @Nullable
-  public Map<Expression, Expression> getExpressionOverrideMap(String 
tableNameWithType) {
-    TableConfigInfo tableConfigInfo = 
_tableConfigInfoMap.get(tableNameWithType);
-    return tableConfigInfo != null ? tableConfigInfo._expressionOverrideMap : 
null;
+  public Map<Expression, Expression> getExpressionOverrideMap(String 
physicalOrLogicalTableName) {
+    TableConfigInfo tableConfigInfo = 
_tableConfigInfoMap.get(physicalOrLogicalTableName);
+    if (tableConfigInfo != null) {
+      return tableConfigInfo._expressionOverrideMap;
+    }
+    LogicalTableConfigInfo logicalTableConfigInfo = 
_logicalTableConfigInfoMap.get(physicalOrLogicalTableName);
+    return logicalTableConfigInfo != null ? 
logicalTableConfigInfo._expressionOverrideMap : null;
   }
 
   /**
@@ -256,7 +261,6 @@ public class TableCache implements PinotConfigProvider {
   @Nullable
   @Override
   public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
-    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
     LogicalTableConfigInfo logicalTableConfigInfo = 
_logicalTableConfigInfoMap.get(logicalTableName);
     return logicalTableConfigInfo != null ? 
logicalTableConfigInfo._logicalTableConfig : null;
   }
@@ -344,14 +348,13 @@ public class TableCache implements PinotConfigProvider {
 
   private void putLogicalTableConfig(ZNRecord znRecord)
       throws IOException {
-    LogicalTableConfig logicalTableConfig = 
LogicalTableUtils.fromZNRecord(znRecord);
+    LogicalTableConfig logicalTableConfig = 
LogicalTableConfigUtils.fromZNRecord(znRecord);
     String logicalTableName = logicalTableConfig.getTableName();
+    _logicalTableConfigInfoMap.put(logicalTableName, new 
LogicalTableConfigInfo(logicalTableConfig));
     if (_ignoreCase) {
       _logicalTableNameMap.put(logicalTableName.toLowerCase(), 
logicalTableName);
-      _logicalTableConfigInfoMap.put(logicalTableName.toLowerCase(), new 
LogicalTableConfigInfo(logicalTableConfig));
     } else {
       _logicalTableNameMap.put(logicalTableName, logicalTableName);
-      _logicalTableConfigInfoMap.put(logicalTableName, new 
LogicalTableConfigInfo(logicalTableConfig));
     }
   }
 
@@ -389,8 +392,8 @@ public class TableCache implements PinotConfigProvider {
   private void removeLogicalTableConfig(String path) {
     _propertyStore.unsubscribeDataChanges(path, 
_zkLogicalTableConfigChangeListener);
     String logicalTableName = 
path.substring(LOGICAL_TABLE_PATH_PREFIX.length());
-    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
     _logicalTableConfigInfoMap.remove(logicalTableName);
+    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
     _logicalTableNameMap.remove(logicalTableName);
   }
 
@@ -644,6 +647,31 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  private static Map<Expression, Expression> 
createExpressionOverrideMap(String physicalOrLogicalTableName,
+      QueryConfig queryConfig) {
+    Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
+    if (queryConfig != null && 
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
+      for (Map.Entry<String, String> entry : 
queryConfig.getExpressionOverrideMap().entrySet()) {
+        try {
+          Expression srcExp = 
CalciteSqlParser.compileToExpression(entry.getKey());
+          Expression destExp = 
CalciteSqlParser.compileToExpression(entry.getValue());
+          expressionOverrideMap.put(srcExp, destExp);
+        } catch (Exception e) {
+          LOGGER.warn("Caught exception while compiling expression override: 
{} -> {} for table: {}, skipping it",
+              entry.getKey(), entry.getValue(), physicalOrLogicalTableName);
+        }
+      }
+      int mapSize = expressionOverrideMap.size();
+      if (mapSize == 1) {
+        Map.Entry<Expression, Expression> entry = 
expressionOverrideMap.entrySet().iterator().next();
+        return Collections.singletonMap(entry.getKey(), entry.getValue());
+      } else if (mapSize > 1) {
+        return expressionOverrideMap;
+      }
+    }
+    return null;
+  }
+
   private static class TableConfigInfo {
     final TableConfig _tableConfig;
     final Map<Expression, Expression> _expressionOverrideMap;
@@ -652,31 +680,7 @@ public class TableCache implements PinotConfigProvider {
 
     private TableConfigInfo(TableConfig tableConfig) {
       _tableConfig = tableConfig;
-      QueryConfig queryConfig = tableConfig.getQueryConfig();
-      if (queryConfig != null && 
MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) {
-        Map<Expression, Expression> expressionOverrideMap = new TreeMap<>();
-        for (Map.Entry<String, String> entry : 
queryConfig.getExpressionOverrideMap().entrySet()) {
-          try {
-            Expression srcExp = 
CalciteSqlParser.compileToExpression(entry.getKey());
-            Expression destExp = 
CalciteSqlParser.compileToExpression(entry.getValue());
-            expressionOverrideMap.put(srcExp, destExp);
-          } catch (Exception e) {
-            LOGGER.warn("Caught exception while compiling expression override: 
{} -> {} for table: {}, skipping it",
-                entry.getKey(), entry.getValue(), tableConfig.getTableName());
-          }
-        }
-        int mapSize = expressionOverrideMap.size();
-        if (mapSize == 0) {
-          _expressionOverrideMap = null;
-        } else if (mapSize == 1) {
-          Map.Entry<Expression, Expression> entry = 
expressionOverrideMap.entrySet().iterator().next();
-          _expressionOverrideMap = Collections.singletonMap(entry.getKey(), 
entry.getValue());
-        } else {
-          _expressionOverrideMap = expressionOverrideMap;
-        }
-      } else {
-        _expressionOverrideMap = null;
-      }
+      _expressionOverrideMap = 
createExpressionOverrideMap(tableConfig.getTableName(), 
tableConfig.getQueryConfig());
       _timestampIndexColumns = 
TimestampIndexUtils.extractColumnsWithGranularity(tableConfig);
     }
   }
@@ -693,10 +697,12 @@ public class TableCache implements PinotConfigProvider {
 
   private static class LogicalTableConfigInfo {
     final LogicalTableConfig _logicalTableConfig;
-    // TODO : Add expression override map for logical table, issue #15607
+    final Map<Expression, Expression> _expressionOverrideMap;
 
     private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
       _logicalTableConfig = logicalTableConfig;
+      _expressionOverrideMap = 
createExpressionOverrideMap(logicalTableConfig.getTableName(),
+          logicalTableConfig.getQueryConfig());
     }
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index e1f21ceea4..c465f11b13 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -40,7 +40,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.LogicalTableUtils;
+import org.apache.pinot.common.utils.LogicalTableConfigUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -837,7 +837,7 @@ public class ZKMetadataProvider {
   public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
       LogicalTableConfig logicalTableConfig) {
     try {
-      ZNRecord znRecord = LogicalTableUtils.toZNRecord(logicalTableConfig);
+      ZNRecord znRecord = 
LogicalTableConfigUtils.toZNRecord(logicalTableConfig);
       String path = 
constructPropertyStorePathForLogical(logicalTableConfig.getTableName());
       propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
     } catch (JsonProcessingException e) {
@@ -851,7 +851,7 @@ public class ZKMetadataProvider {
     if (znRecords != null) {
       return znRecords.stream().map(znRecord -> {
         try {
-          return LogicalTableUtils.fromZNRecord(znRecord);
+          return LogicalTableConfigUtils.fromZNRecord(znRecord);
         } catch (IOException e) {
           LOGGER.error("Caught exception while converting ZNRecord to 
LogicalTable: {}", znRecord.getId(), e);
           return null;
@@ -870,7 +870,7 @@ public class ZKMetadataProvider {
       if (logicalTableZNRecord == null) {
         return null;
       }
-      return LogicalTableUtils.fromZNRecord(logicalTableZNRecord);
+      return LogicalTableConfigUtils.fromZNRecord(logicalTableZNRecord);
     } catch (Exception e) {
       LOGGER.error("Caught exception while getting logical table: {}", 
tableName, e);
       return null;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
similarity index 58%
rename from 
pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
index be92ffdeb0..d9b5e69b1e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java
@@ -21,12 +21,16 @@ package org.apache.pinot.common.utils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -34,9 +38,9 @@ import 
org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
-public class LogicalTableUtils {
+public class LogicalTableConfigUtils {
 
-  private LogicalTableUtils() {
+  private LogicalTableConfigUtils() {
     // Utility class
   }
 
@@ -46,6 +50,21 @@ public class LogicalTableUtils {
         
.setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY))
         
.setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
 
+    if (record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY) != null) {
+      
builder.setQueryConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY),
+          QueryConfig.class));
+    }
+    if (record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY) != null) {
+      
builder.setQuotaConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY),
+          QuotaConfig.class));
+    }
+    if (record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY) 
!= null) {
+      
builder.setRefOfflineTableName(record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY));
+    }
+    if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY) 
!= null) {
+      
builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY));
+    }
+
     Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
     for (Map.Entry<String, String> entry : 
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
         .entrySet()) {
@@ -71,10 +90,25 @@ public class LogicalTableUtils {
     record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY, 
logicalTableConfig.getTableName());
     record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY, 
logicalTableConfig.getBrokerTenant());
     record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY, 
physicalTableConfigMap);
+
+    if (logicalTableConfig.getQueryConfig() != null) {
+      record.setSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY, 
logicalTableConfig.getQueryConfig().toJsonString());
+    }
+    if (logicalTableConfig.getQuotaConfig() != null) {
+      record.setSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY, 
logicalTableConfig.getQuotaConfig().toJsonString());
+    }
+    if (logicalTableConfig.getRefOfflineTableName() != null) {
+      record.setSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY,
+          logicalTableConfig.getRefOfflineTableName());
+    }
+    if (logicalTableConfig.getRefRealtimeTableName() != null) {
+      record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY,
+          logicalTableConfig.getRefRealtimeTableName());
+    }
     return record;
   }
 
-  public static void validateLogicalTableName(
+  public static void validateLogicalTableConfig(
       LogicalTableConfig logicalTableConfig,
       Predicate<String> physicalTableExistsPredicate,
       Predicate<String> brokerTenantExistsPredicate,
@@ -95,6 +129,9 @@ public class LogicalTableUtils {
           "Invalid logical table. Reason: 'physicalTableConfigMap' should not 
be null or empty");
     }
 
+    Set<String> offlineTableNames = new HashSet<>();
+    Set<String> realtimeTableNames = new HashSet<>();
+
     for (Map.Entry<String, PhysicalTableConfig> entry : 
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
       String physicalTableName = entry.getKey();
       PhysicalTableConfig physicalTableConfig = entry.getValue();
@@ -110,6 +147,44 @@ public class LogicalTableUtils {
             "Invalid logical table. Reason: 'physicalTableConfig' should not 
be null for physical table: "
                 + physicalTableName);
       }
+
+      if (TableNameBuilder.isOfflineTableResource(physicalTableName)) {
+        offlineTableNames.add(physicalTableName);
+      } else if (TableNameBuilder.isRealtimeTableResource(physicalTableName)) {
+        realtimeTableNames.add(physicalTableName);
+      }
+    }
+
+    // validate ref offline table name is not null or empty when offline 
tables exists
+    if (!offlineTableNames.isEmpty() && 
StringUtils.isEmpty(logicalTableConfig.getRefOfflineTableName())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'refOfflineTableName' should not be 
null or empty when offline table exists");
+    }
+
+    // validate ref realtime table name is not null or empty when realtime 
tables exists
+    if (!realtimeTableNames.isEmpty() && 
StringUtils.isEmpty(logicalTableConfig.getRefRealtimeTableName())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'refRealtimeTableName' should not be 
null or empty when realtime table "
+              + "exists");
+    }
+
+    // validate ref offline table name is present in the offline tables
+    if (!offlineTableNames.isEmpty() && 
!offlineTableNames.contains(logicalTableConfig.getRefOfflineTableName())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'refOfflineTableName' should be one 
of the provided offline tables");
+    }
+
+    // validate ref realtime table name is present in the realtime tables
+    if (!realtimeTableNames.isEmpty() && 
!realtimeTableNames.contains(logicalTableConfig.getRefRealtimeTableName())) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'refRealtimeTableName' should be one 
of the provided realtime tables");
+    }
+
+    // validate quota.storage is not set
+    QuotaConfig quotaConfig = logicalTableConfig.getQuotaConfig();
+    if (quotaConfig != null && !StringUtils.isEmpty(quotaConfig.getStorage())) 
{
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'quota.storage' should not be set 
for logical table");
     }
 
     // validate broker tenant exists
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ad62a01227..de7b6bba80 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -127,7 +127,7 @@ import org.apache.pinot.common.utils.BcryptUtils;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.LogicalTableUtils;
+import org.apache.pinot.common.utils.LogicalTableConfigUtils;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -541,19 +541,22 @@ public class PinotHelixResourceManager {
   }
 
   @Nullable
-  private String getBrokerTenantName(String tableName) {
-    TableConfig offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName);
+  private String getBrokerTenantName(String physicalOrLogicalTableName) {
+    TableConfig offlineTableConfig =
+        ZKMetadataProvider.getOfflineTableConfig(_propertyStore, 
physicalOrLogicalTableName);
     if (offlineTableConfig != null) {
       return offlineTableConfig.getTenantConfig().getBroker();
     }
 
-    TableConfig realtimeTableConfig = 
ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, tableName);
+    TableConfig realtimeTableConfig =
+        ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, 
physicalOrLogicalTableName);
     if (realtimeTableConfig != null) {
       return realtimeTableConfig.getTenantConfig().getBroker();
     }
 
     // If the table is not found, check if it is a logical table
-    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    LogicalTableConfig logicalTableConfig =
+        ZKMetadataProvider.getLogicalTableConfig(_propertyStore, 
physicalOrLogicalTableName);
     if (logicalTableConfig != null) {
       return logicalTableConfig.getBrokerTenant();
     }
@@ -1752,6 +1755,12 @@ public class PinotHelixResourceManager {
           + " already exists. If this is unexpected, try deleting the table to 
remove all metadata associated"
           + " with it.");
     }
+
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, rawTableName)) 
{
+      throw new TableAlreadyExistsException("Logical table '" + rawTableName
+          + "' already exists. Please use a different name for the physical 
table.");
+    }
     if (_helixAdmin.getResourceExternalView(_helixClusterName, 
tableNameWithType) != null) {
       throw new TableAlreadyExistsException("External view for " + 
tableNameWithType
           + " still exists. If the table is just deleted, please wait for the 
clean up to finish before recreating it. "
@@ -1772,7 +1781,7 @@ public class PinotHelixResourceManager {
             _enableBatchMessageMode);
     TableType tableType = tableConfig.getTableType();
     // Ensure that table is not created if schema is not present
-    if (ZKMetadataProvider.getSchema(_propertyStore, 
TableNameBuilder.extractRawTableName(tableNameWithType)) == null) {
+    if (ZKMetadataProvider.getSchema(_propertyStore, rawTableName) == null) {
       throw new InvalidTableConfigException("No schema defined for table: " + 
tableNameWithType);
     }
     Preconditions.checkState(tableType == TableType.OFFLINE || tableType == 
TableType.REALTIME,
@@ -1836,9 +1845,11 @@ public class PinotHelixResourceManager {
       logicalTableConfig.setBrokerTenant("DefaultTenant");
     }
 
-    LogicalTableUtils.validateLogicalTableName(
+    PinotHelixPropertyStoreZnRecordProvider 
pinotHelixPropertyStoreZnRecordProvider =
+        PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore);
+    LogicalTableConfigUtils.validateLogicalTableConfig(
         logicalTableConfig,
-        
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
+        pinotHelixPropertyStoreZnRecordProvider::exist,
         getAllBrokerTenantNames()::contains,
         _propertyStore
     );
@@ -1849,10 +1860,10 @@ public class PinotHelixResourceManager {
     }
 
     // Check if the table name is already used by a physical table
-    
getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals)
-        .findFirst().ifPresent(tableNameWithType -> {
-          throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
-        });
+    if 
(pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
+    || 
pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.REALTIME.tableNameWithType(tableName)))
 {
+      throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
+    }
 
     LOGGER.info("Adding logical table {}: Creating logical table config in the 
property store", tableName);
     ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
@@ -2124,22 +2135,25 @@ public class PinotHelixResourceManager {
       logicalTableConfig.setBrokerTenant("DefaultTenant");
     }
 
-    LogicalTableUtils.validateLogicalTableName(
+    LogicalTableConfigUtils.validateLogicalTableConfig(
         logicalTableConfig,
         
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
         getAllBrokerTenantNames()::contains,
         _propertyStore
     );
 
-    if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
+    LogicalTableConfig oldLogicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    if (oldLogicalTableConfig == null) {
       throw new TableNotFoundException("Logical table: " + tableName + " does 
not exist");
     }
 
     LOGGER.info("Updating logical table {}: Updating logical table config in 
the property store", tableName);
     ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
 
-    LOGGER.info("Updating logical table {}: Updating BrokerResource for 
table", tableName);
-    updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
+    if 
(!oldLogicalTableConfig.getBrokerTenant().equals(logicalTableConfig.getBrokerTenant()))
 {
+      LOGGER.info("Updating logical table {}: Updating BrokerResource for 
table", tableName);
+      updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
+    }
 
     LOGGER.info("Updated logical table {}: Successfully updated table", 
tableName);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 5777bae1bf..fc21084328 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -46,6 +46,7 @@ import 
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -865,6 +866,35 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     }
   }
 
+  @Test
+  public void testTableWithSameNameAsLogicalTableIsNotAllowed()
+      throws IOException {
+    // Create physical table
+    String tableName = "testTable";
+    DEFAULT_INSTANCE.addDummySchema(tableName);
+    TableConfig offlineTableConfig = 
_offlineBuilder.setTableName(tableName).build();
+    String creationResponse = sendPostRequest(_createTableUrl, 
offlineTableConfig.toJsonString());
+    assertEquals(creationResponse,
+        "{\"unrecognizedProperties\":{},\"status\":\"Table testTable_OFFLINE 
successfully added\"}");
+
+    // create logical table with above physical table
+    String logicalTableName = "testTable_LOGICAL";
+    DEFAULT_INSTANCE.addDummySchema(logicalTableName);
+    LogicalTableConfig logicalTableConfig = 
ControllerTest.getDummyLogicalTableConfig(
+        logicalTableName, List.of(offlineTableConfig.getTableName()), 
"DefaultTenant");
+    String addLogicalTableUrl = 
DEFAULT_INSTANCE.getControllerRequestURLBuilder().forLogicalTableCreate();
+    String logicalTableResponse = sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString());
+    assertEquals(logicalTableResponse,
+        "{\"unrecognizedProperties\":{},\"status\":\"testTable_LOGICAL logical 
table successfully added.\"}");
+
+    // create table with same as logical table and should fail
+    TableConfig offlineTableConfig2 = 
_offlineBuilder.setTableName(logicalTableName).build();
+    IOException aThrows = expectThrows(IOException.class,
+        () -> sendPostRequest(_createTableUrl, 
offlineTableConfig2.toJsonString()));
+    assertTrue(aThrows.getMessage().contains("Logical table '" + 
logicalTableName + "' already exists"),
+        aThrows.getMessage());
+  }
+
   /**
    * Updating existing REALTIME table with invalid replication factor should 
throw exception.
    */
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index d6df95e5cb..ced4a3f6c9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -22,7 +22,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.config.table.QuotaConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.testng.annotations.AfterClass;
@@ -32,7 +39,9 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import static org.testng.Assert.fail;
 
 
@@ -40,20 +49,26 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
 
   private static final String LOGICAL_TABLE_NAME = "test_logical_table";
   public static final String BROKER_TENANT = "DefaultTenant";
+  public static final String NEW_BROKER_TENANT = "NewBrokerTenant";
   protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
+  private String _addLogicalTableUrl;
 
   @BeforeClass
   public void setUpClass()
       throws Exception {
     startZk();
     startController();
-    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(2, false);
     addFakeServerInstancesToAutoJoinHelixCluster(1, true);
     _controllerRequestURLBuilder = getControllerRequestURLBuilder();
+    _addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate();
+    createBrokerTenant(BROKER_TENANT, 1);
+    createBrokerTenant(NEW_BROKER_TENANT, 1);
   }
 
   @AfterClass
   public void tearDownClass() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
@@ -86,7 +101,6 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
       throws IOException {
     addDummySchema(logicalTableName);
     // verify logical table does not exist
-    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
     String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
     String updateLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
     String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
@@ -101,7 +115,7 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
 
     // create logical table
     String resp =
-        ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+        ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
     assertEquals(resp,
         "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " 
logical table successfully added.\"}");
 
@@ -129,122 +143,211 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     verifyLogicalTableDoesNotExists(getLogicalTableUrl);
   }
 
-  @Test
-  public void testLogicalTableValidationTests()
+  @Test(expectedExceptions = IOException.class,
+      expectedExceptionsMessageRegExp = ".*Reason: 'quota.storage' should not 
be set for logical table.*")
+  public void testLogicalTableQuotaConfigValidation()
       throws IOException {
-    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_1"));
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+    logicalTableConfig.setQuotaConfig(new QuotaConfig("10G", "999"));
+    ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+  }
 
-    // create physical tables
-    List<String> physicalTableNames = List.of("test_table_1", "test_table_2");
-    List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
+  @Test
+  public void testLogicalTableReferenceTableValidation()
+      throws IOException {
+    final List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_7"));
+
+    // Test ref offline table name is null validation
+    IOException aThrows = expectThrows(
+        IOException.class, () -> {
+          LogicalTableConfig logicalTableConfig =
+              getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+          logicalTableConfig.setRefOfflineTableName(null);
+          ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(),
+              getHeaders());
+        }
+    );
+    assertTrue(aThrows.getMessage()
+            .contains("Reason: 'refOfflineTableName' should not be null or 
empty when offline table exists"),
+        aThrows.getMessage());
+
+    // Test ref realtime table name is null validation
+    aThrows = expectThrows(
+        IOException.class, () -> {
+          LogicalTableConfig logicalTableConfig =
+              getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+          logicalTableConfig.setRefRealtimeTableName(null);
+          ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(),
+              getHeaders());
+        }
+    );
+    assertTrue(aThrows.getMessage()
+            .contains("Reason: 'refRealtimeTableName' should not be null or 
empty when realtime table exists"),
+        aThrows.getMessage());
+
+    // Test ref offline table is present in the offline tables validation
+    aThrows = expectThrows(
+        IOException.class, () -> {
+          LogicalTableConfig logicalTableConfig =
+              getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+          logicalTableConfig.setRefOfflineTableName("random_table_OFFLINE");
+          ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(),
+              getHeaders());
+        }
+    );
+    assertTrue(aThrows.getMessage()
+            .contains("Reason: 'refOfflineTableName' should be one of the 
provided offline tables"),
+        aThrows.getMessage());
+
+    // Test ref realtime table is present in the realtime tables validation
+    aThrows = expectThrows(
+        IOException.class, () -> {
+          LogicalTableConfig logicalTableConfig =
+              getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+          logicalTableConfig.setRefRealtimeTableName("random_table_REALTIME");
+          ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(),
+              getHeaders());
+        }
+    );
+    assertTrue(aThrows.getMessage()
+            .contains("Reason: 'refRealtimeTableName' should be one of the 
provided realtime tables"),
+        aThrows.getMessage());
+  }
 
-    // Test logical table name with _OFFLINE and _REALTIME is not allowed
+  @Test(expectedExceptions = IOException.class,
+      expectedExceptionsMessageRegExp = ".*Reason: 'InvalidTenant' should be 
one of the existing broker tenants.*")
+  public void testLogicalTableBrokerTenantValidation()
+      throws IOException {
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_3"));
     LogicalTableConfig logicalTableConfig =
-        getDummyLogicalTableConfig("testLogicalTable_OFFLINE", 
physicalTableNamesWithType, BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Reason: 'tableName' should not end 
with _OFFLINE or _REALTIME"),
-          e.getMessage());
-    }
+        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, "InvalidTenant");
 
-    logicalTableConfig =
-        getDummyLogicalTableConfig("testLogicalTable_REALTIME", 
physicalTableNamesWithType, BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Reason: 'tableName' should not end 
with _OFFLINE or _REALTIME"),
-          e.getMessage());
-    }
-
-    // Test logical table name can not be same as existing physical table name
-    logicalTableConfig =
-        getDummyLogicalTableConfig("test_table_1", physicalTableNamesWithType, 
BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Table name: test_table_1 already 
exists"), e.getMessage());
-    }
+    ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+  }
 
+  @Test
+  public void testLogicalTablePhysicalTableConfigValidation() {
     // Test empty physical table names is not allowed
-    logicalTableConfig =
-        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), 
BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("'physicalTableConfigMap' should not 
be null or empty"), e.getMessage());
-    }
+    Throwable throwable = expectThrows(IOException.class, () -> {
+      LogicalTableConfig tableConfig = 
getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), BROKER_TENANT);
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
tableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage().contains("Reason: 
'physicalTableConfigMap' should not be null or empty"),
+        throwable.getMessage());
 
     // Test all table names are physical table names and none is hybrid table 
name
-    logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNames, BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Reason: 'test_table_1' should be one 
of the existing tables"),
-          e.getMessage());
-    }
+    throwable = expectThrows(IOException.class, () -> {
+      List<String> physicalTableNames = List.of("test_table_1");
+      LogicalTableConfig logicalTableConfig =
+          getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNames, 
BROKER_TENANT);
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage().contains("Reason: 'test_table_1' should 
be one of the existing tables"),
+        throwable.getMessage());
+  }
 
-    // Test valid broker tenant is provided
-    logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, "InvalidTenant");
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Reason: 'InvalidTenant' should be 
one of the existing broker tenants"),
-          e.getMessage());
-    }
+  @Test(expectedExceptions = IOException.class,
+      expectedExceptionsMessageRegExp = ".*Table name: test_table already 
exists.*")
+  public void testLogicalTableNameCannotSameAsPhysicalTableNameValidation()
+      throws IOException {
+    String tableName = "test_table";
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of(tableName));
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(tableName, physicalTableNamesWithType, 
BROKER_TENANT);
+    ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+  }
+
+  @Test
+  public void testLogicalTableNameSuffixValidation()
+      throws IOException {
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_4"));
+
+    // Test logical table name with _OFFLINE suffix validation
+    Throwable throwable = expectThrows(IOException.class, () -> {
+      LogicalTableConfig logicalTableConfig =
+          getDummyLogicalTableConfig("test_logical_table_OFFLINE", 
physicalTableNamesWithType, BROKER_TENANT);
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage().contains("Reason: 'tableName' should not 
end with _OFFLINE or _REALTIME"),
+        throwable.getMessage());
+
+    // Test logical table name with _REALTIME suffix validation
+    throwable = expectThrows(IOException.class, () -> {
+      LogicalTableConfig logicalTableConfig =
+          getDummyLogicalTableConfig("test_logical_table_REALTIME", 
physicalTableNamesWithType, BROKER_TENANT);
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage().contains("Reason: 'tableName' should not 
end with _OFFLINE or _REALTIME"),
+        throwable.getMessage());
+  }
+
+  @DataProvider
+  public Object[][] tableTypeProvider() {
+    return new Object[][]{
+        {TableType.OFFLINE},
+        {TableType.REALTIME}
+    };
+  }
+
+  @Test(dataProvider = "tableTypeProvider")
+  public void testCreateLogicalTable(TableType tableType)
+      throws IOException {
+    // Test logical table with only realtime table
+    String tableName = "test_table";
+    addDummySchema(tableName);
+    TableConfig tableConfig = createDummyTableConfig(tableName, tableType);
+    addTableConfig(tableConfig);
+    addDummySchema(LOGICAL_TABLE_NAME);
+    String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
List.of(tableConfig.getTableName()), BROKER_TENANT);
+    ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
   }
 
   @Test
   public void testLogicalTableSchemaValidation()
       throws IOException {
-    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
-    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_3"));
+    final List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_6"));
 
     // Test logical table schema does not exist
-    LogicalTableConfig logicalTableConfig =
-        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Reason: Schema with same name as 
logical table '" + LOGICAL_TABLE_NAME
-          + "' does not exist"), e.getMessage());
-    }
+    Throwable throwable = expectThrows(IOException.class, () -> {
+      LogicalTableConfig logicalTableConfig =
+          getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage().contains("Reason: Schema with same name 
as logical table '" + LOGICAL_TABLE_NAME
+        + "' does not exist"), throwable.getMessage());
 
     // Test logical table with db prefix but schema without db prefix
-    addDummySchema(LOGICAL_TABLE_NAME);
-    logicalTableConfig = getDummyLogicalTableConfig("db." + 
LOGICAL_TABLE_NAME, physicalTableNamesWithType,
-        BROKER_TENANT);
-    try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
-      fail("Logical Table POST request should have failed");
-    } catch (IOException e) {
-      assertTrue(e.getMessage().contains("Reason: Schema with same name as 
logical table 'db." + LOGICAL_TABLE_NAME
-          + "' does not exist"), e.getMessage());
-    }
+    throwable = expectThrows(IOException.class, () -> {
+      addDummySchema(LOGICAL_TABLE_NAME);
+      LogicalTableConfig logicalTableConfig =
+          getDummyLogicalTableConfig("db." + LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    });
+    assertTrue(throwable.getMessage()
+            .contains("Reason: Schema with same name as logical table 'db." + 
LOGICAL_TABLE_NAME + "' does not exist"),
+        throwable.getMessage());
   }
 
   @Test
   public void testLogicalTableWithSameNameNotAllowed()
       throws IOException {
-    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
     String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
-    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_2"));
+    addDummySchema(LOGICAL_TABLE_NAME);
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_5"));
 
     LogicalTableConfig
         logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
-    addDummySchema(LOGICAL_TABLE_NAME);
-    ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+
     verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
     try {
       // create the same logical table again
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
       fail("Logical Table POST request should have failed");
     } catch (IOException e) {
       assertTrue(e.getMessage().contains("Logical table: test_logical_table 
already exists"), e.getMessage());
@@ -265,8 +368,6 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
   public void testPhysicalTableShouldExist(String logicalTableName, 
List<String> physicalTableNames,
       String unknownTableName)
       throws IOException {
-    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
-
     // setup physical tables
     List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
     physicalTableNamesWithType.add(unknownTableName);
@@ -275,7 +376,7 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     LogicalTableConfig
         logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, 
physicalTableNamesWithType, BROKER_TENANT);
     try {
-      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
       fail("Logical Table POST request should have failed");
     } catch (IOException e) {
       assertTrue(e.getMessage().contains("'" + unknownTableName + "' should be 
one of the existing tables"),
@@ -301,8 +402,7 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
       LogicalTableConfig logicalTableConfig = 
getDummyLogicalTableConfig(logicalTableNames.get(i), List.of(
           physicalTableNamesWithType.get(2 * i), 
physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT);
 
-      
ControllerTest.sendPostRequest(_controllerRequestURLBuilder.forLogicalTableCreate(),
-          logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
     }
 
     // verify logical table names
@@ -310,6 +410,49 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     assertEquals(getLogicalTableNamesResponse, 
objectMapper.writeValueAsString(logicalTableNames));
   }
 
+  @Test
+  public void testLogicalTableUpdateBrokerTenantUpdate()
+      throws Exception {
+    PinotHelixResourceManager helixResourceManager = getHelixResourceManager();
+    String logicalTableName = "test_logical_table";
+    String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
+    String updateLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
+
+    // Create a logical table
+    addDummySchema(logicalTableName);
+    List<String> physicalTables = 
createHybridTables(List.of("physical_table"));
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(logicalTableName, physicalTables, 
BROKER_TENANT);
+    String addLogicalTableResponse =
+        ControllerTest.sendPostRequest(_addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    assertEquals(addLogicalTableResponse,
+        "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " 
logical table successfully added.\"}");
+    verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+
+    // verify table broker node and broker tenant node is same
+    IdealState brokerIdealStates = 
HelixHelper.getBrokerIdealStates(helixResourceManager.getHelixAdmin(),
+        helixResourceManager.getHelixClusterName());
+    Map<String, String> instanceStateMap = 
brokerIdealStates.getInstanceStateMap(logicalTableName);
+    Set<String> brokerForTenant = 
helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT);
+    assertEquals(brokerForTenant, instanceStateMap.keySet());
+
+    //verify broker tenant node sets are different
+    Set<String> allInstancesForBrokerTenant = 
helixResourceManager.getAllInstancesForBrokerTenant(NEW_BROKER_TENANT);
+    assertNotEquals(brokerForTenant, allInstancesForBrokerTenant);
+
+    // update logical table with new broker tenant
+    logicalTableConfig.setBrokerTenant(NEW_BROKER_TENANT);
+    sendPutRequest(updateLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+
+    // verify the broker node set is updated in IS
+    brokerIdealStates = 
HelixHelper.getBrokerIdealStates(helixResourceManager.getHelixAdmin(),
+        helixResourceManager.getHelixClusterName());
+    instanceStateMap = brokerIdealStates.getInstanceStateMap(logicalTableName);
+    Set<String> brokerForNewTenant = 
helixResourceManager.getAllInstancesForBrokerTenant(NEW_BROKER_TENANT);
+    assertEquals(brokerForNewTenant, instanceStateMap.keySet());
+  }
+
   private void verifyLogicalTableExists(String getLogicalTableUrl, 
LogicalTableConfig logicalTableConfig)
       throws IOException {
     LogicalTableConfig remoteLogicalTableConfig =
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
index 62632f1a6a..7219a75402 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
@@ -88,6 +88,7 @@ public class PinotUserWithAccessLogicalTableResourceTest 
extends ControllerTest
     } catch (Exception e) {
       // ignore
     }
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 6f7c06260a..9f775d3201 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -70,6 +70,8 @@ import 
org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.api.resources.TableViews;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -395,9 +397,17 @@ public class ControllerTest {
     for (String physicalTableName : physicalTableNames) {
       physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
     }
+    String offlineTableName =
+        
physicalTableNames.stream().filter(TableNameBuilder::isOfflineTableResource).findFirst().orElse(null);
+    String realtimeTableName =
+        
physicalTableNames.stream().filter(TableNameBuilder::isRealtimeTableResource).findFirst().orElse(null);
     LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
         .setTableName(tableName)
         .setBrokerTenant(brokerTenant)
+        .setRefOfflineTableName(offlineTableName)
+        .setRefRealtimeTableName(realtimeTableName)
+        .setQuotaConfig(new QuotaConfig(null, "999"))
+        .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L))
         .setPhysicalTableConfigMap(physicalTableConfigMap);
     return builder.build();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index a10b2f0aea..0f21afc305 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -28,15 +28,14 @@ import 
org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
 import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
 import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
+import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
-import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
-import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -45,7 +44,11 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 
 public class TableCacheTest {
@@ -106,8 +109,9 @@ public class TableCacheTest {
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
 
     // Add logical table
+    LogicalTableConfig logicalTableConfig =
+        ControllerTest.getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
List.of(OFFLINE_TABLE_NAME), "DefaultTenant");
     addSchema(LOGICAL_TABLE_NAME, tableCache);
-    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME));
     
TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig);
     // Wait for at most 10 seconds for the callback to add the logical table 
to the cache
     TestUtils.waitForCondition(aVoid -> 
tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L,
@@ -115,12 +119,12 @@ public class TableCacheTest {
     // Logical table can be accessed by the logical table name
     if (isCaseInsensitive) {
       
assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), 
LOGICAL_TABLE_NAME);
-      
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
     } else {
       
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
     }
     assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
     assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), 
getExpectedSchema(LOGICAL_TABLE_NAME));
+    assertNull(tableCache.getExpressionOverrideMap(LOGICAL_TABLE_NAME));
 
     // Register the change listeners
     TestTableConfigChangeListener tableConfigChangeListener = new 
TestTableConfigChangeListener();
@@ -210,21 +214,25 @@ public class TableCacheTest {
         aVoid -> 
anotherTableConfig.equals(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE)), 
10_000L,
         "Failed to add the table config to the cache");
     // update the logical table
-    logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, 
List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE));
+    logicalTableConfig = 
ControllerTest.getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
+        List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE), "DefaultTenant");
+    logicalTableConfig.setQueryConfig(new QueryConfig(
+        1L, false, false, Map.of("DaysSinceEpoch * 24", 
"NewAddedDerivedHoursSinceEpoch"), 1L, 1L
+    ));
     
TEST_INSTANCE.getHelixResourceManager().updateLogicalTableConfig(logicalTableConfig);
-    // Wait for at most 10 seconds for the callback to update the logical 
table in the cache
     TestUtils.waitForCondition(
         aVoid -> 
Objects.requireNonNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME))
             .getPhysicalTableConfigMap().size() == 2, 10_000L,
-        "Failed to update the logical table in the cache");
-
+        "Failed to update the logical table in the cache"
+    );
     if (isCaseInsensitive) {
-      
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
+      
assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), 
LOGICAL_TABLE_NAME);
     } else {
       
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
     }
     assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
     assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), 
getExpectedSchema(LOGICAL_TABLE_NAME));
+    assertNotNull(tableCache.getExpressionOverrideMap(LOGICAL_TABLE_NAME));
 
     // Remove the table config
     TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
@@ -241,6 +249,15 @@ public class TableCacheTest {
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
 
+    // Remove logical table
+    
TEST_INSTANCE.getHelixResourceManager().deleteLogicalTableConfig(LOGICAL_TABLE_NAME);
+    // Wait for at most 10 seconds for the callback to remove the logical 
table from the cache
+    // NOTE:
+    // - Verify if the callback is fully done by checking the logical table 
change lister because it is the last step of
+    //   the callback handling
+    TestUtils.waitForCondition(aVoid -> 
logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L,
+        "Failed to remove the logical table from the cache");
+
     // Remove the schema
     TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME);
     TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE);
@@ -252,15 +269,6 @@ public class TableCacheTest {
     TestUtils.waitForCondition(aVoid -> 
schemaChangeListener._schemaList.isEmpty(), 10_000L,
         "Failed to remove the schema from the cache");
 
-    // Remove logical table
-    
TEST_INSTANCE.getHelixResourceManager().deleteLogicalTableConfig(LOGICAL_TABLE_NAME);
-    // Wait for at most 10 seconds for the callback to remove the logical 
table from the cache
-    // NOTE:
-    // - Verify if the callback is fully done by checking the logical table 
change lister because it is the last step of
-    //   the callback handling
-    TestUtils.waitForCondition(aVoid -> 
logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L,
-        "Failed to remove the logical table from the cache");
-
     assertNull(tableCache.getSchema(RAW_TABLE_NAME));
     assertNull(tableCache.getSchema(ANOTHER_TABLE));
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
@@ -304,18 +312,6 @@ public class TableCacheTest {
         .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, 
DataType.STRING).build();
   }
 
-  private static LogicalTableConfig getLogicalTableConfig(String tableName, 
List<String> physicalTableNames) {
-    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
-    for (String physicalTableName : physicalTableNames) {
-      physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
-    }
-    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
-        .setTableName(tableName)
-        .setBrokerTenant("DefaultTenant")
-        .setPhysicalTableConfigMap(physicalTableConfigMap);
-    return builder.build();
-  }
-
   @DataProvider(name = "testTableCacheDataProvider")
   public Object[][] provideCaseInsensitiveSetting() {
     return new Object[][]{new Object[]{true}, new Object[]{false}};
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
index 4d477691d1..2a427ce9dc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -18,18 +18,19 @@
  */
 package org.apache.pinot.spi.data;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.Map;
-import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
-@JsonIgnoreProperties(ignoreUnknown = true)
 public class LogicalTableConfig extends BaseJsonConfig {
 
   private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
@@ -37,10 +38,20 @@ public class LogicalTableConfig extends BaseJsonConfig {
   public static final String LOGICAL_TABLE_NAME_KEY = "tableName";
   public static final String PHYSICAL_TABLE_CONFIG_KEY = 
"physicalTableConfigMap";
   public static final String BROKER_TENANT_KEY = "brokerTenant";
+  public static final String QUERY_CONFIG_KEY = "query";
+  public static final String QUOTA_CONFIG_KEY = "quota";
+  public static final String REF_OFFLINE_TABLE_NAME_KEY = 
"refOfflineTableName";
+  public static final String REF_REALTIME_TABLE_NAME_KEY = 
"refRealtimeTableName";
 
   private String _tableName;
   private String _brokerTenant;
   private Map<String, PhysicalTableConfig> _physicalTableConfigMap;
+  @JsonProperty(QUERY_CONFIG_KEY)
+  private QueryConfig _queryConfig;
+  @JsonProperty(QUOTA_CONFIG_KEY)
+  private QuotaConfig _quotaConfig;
+  private String _refOfflineTableName;
+  private String _refRealtimeTableName;
 
   public static LogicalTableConfig fromString(String logicalTableString)
       throws IOException {
@@ -72,6 +83,42 @@ public class LogicalTableConfig extends BaseJsonConfig {
     _brokerTenant = brokerTenant;
   }
 
+  @JsonProperty(QUERY_CONFIG_KEY)
+  @Nullable
+  public QueryConfig getQueryConfig() {
+    return _queryConfig;
+  }
+
+  public void setQueryConfig(QueryConfig queryConfig) {
+    _queryConfig = queryConfig;
+  }
+
+  @JsonProperty(QUOTA_CONFIG_KEY)
+  @Nullable
+  public QuotaConfig getQuotaConfig() {
+    return _quotaConfig;
+  }
+
+  public void setQuotaConfig(QuotaConfig quotaConfig) {
+    _quotaConfig = quotaConfig;
+  }
+
+  public String getRefOfflineTableName() {
+    return _refOfflineTableName;
+  }
+
+  public void setRefOfflineTableName(String refOfflineTableName) {
+    _refOfflineTableName = refOfflineTableName;
+  }
+
+  public String getRefRealtimeTableName() {
+    return _refRealtimeTableName;
+  }
+
+  public void setRefRealtimeTableName(String refRealtimeTableName) {
+    _refRealtimeTableName = refRealtimeTableName;
+  }
+
   private JsonNode toJsonObject() {
     return DEFAULT_MAPPER.valueToTree(this);
   }
@@ -94,23 +141,6 @@ public class LogicalTableConfig extends BaseJsonConfig {
     }
   }
 
-  @Override
-  public boolean equals(Object object) {
-    if (this == object) {
-      return true;
-    }
-    if (object == null || getClass() != object.getClass()) {
-      return false;
-    }
-    LogicalTableConfig that = (LogicalTableConfig) object;
-    return Objects.equals(getTableName(), that.getTableName());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getTableName());
-  }
-
   @Override
   public String toString() {
     return toSingleLineJsonString();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
index eff47c5af6..54ef55d1f0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.spi.utils.builder;
 
 import java.util.Map;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 
@@ -27,6 +29,11 @@ public class LogicalTableConfigBuilder {
   private String _tableName;
   private Map<String, PhysicalTableConfig> _physicalTableConfigMap;
   private String _brokerTenant;
+  private QueryConfig _queryConfig;
+  private QuotaConfig _quotaConfig;
+  private String _refOfflineTableName;
+  private String _refRealtimeTableName;
+
 
   public LogicalTableConfigBuilder setTableName(String tableName) {
     _tableName = tableName;
@@ -43,11 +50,35 @@ public class LogicalTableConfigBuilder {
     return this;
   }
 
+  public LogicalTableConfigBuilder setQueryConfig(QueryConfig queryConfig) {
+    _queryConfig = queryConfig;
+    return this;
+  }
+
+  public LogicalTableConfigBuilder setQuotaConfig(QuotaConfig quotaConfig) {
+    _quotaConfig = quotaConfig;
+    return this;
+  }
+
+  public LogicalTableConfigBuilder setRefOfflineTableName(String 
refOfflineTableName) {
+    _refOfflineTableName = refOfflineTableName;
+    return this;
+  }
+
+  public LogicalTableConfigBuilder setRefRealtimeTableName(String 
refRealtimeTableName) {
+    _refRealtimeTableName = refRealtimeTableName;
+    return this;
+  }
+
   public LogicalTableConfig build() {
     LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
     logicalTableConfig.setTableName(_tableName);
     logicalTableConfig.setPhysicalTableConfigMap(_physicalTableConfigMap);
     logicalTableConfig.setBrokerTenant(_brokerTenant);
+    logicalTableConfig.setQueryConfig(_queryConfig);
+    logicalTableConfig.setQuotaConfig(_quotaConfig);
+    logicalTableConfig.setRefOfflineTableName(_refOfflineTableName);
+    logicalTableConfig.setRefRealtimeTableName(_refRealtimeTableName);
     return logicalTableConfig;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to