This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 08769d2d7b Logical table CRUD operations. (#15515)
08769d2d7b is described below

commit 08769d2d7b678119ef9622b95cc5307ef7663dcf
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Tue May 6 14:47:46 2025 +0530

    Logical table CRUD operations. (#15515)
    
    * Logical table CRUD operations.
    
    * Logical tables znRecord update, validations and more unit tests.
    
    * Update test comment messages
    
    * Code refactoring and more unit tests.
    
    * more unit tests.
    
    * Add logical table to table cache.
    
    * Code refactoring to handle code duplication.
    
    * Store logical table json using jackson.
    
    * remove sout statement.
    
    * Code refactoring - physical tables config as map.
    
    * Fix checkstyle violations.
    
    * Review comments addressed.
    
    * Code refactoring and address review comments.
    
    * Addressing review comments.
    
    * Refactor LogicalTable to LogicalTableConfig.
    
    * Refactor LogicalTable to LogicalTableConfig.
    
    * Refactor LogicalTable to LogicalTableConfig.
    
    * more unit tests.
    
    ---------
    
    Co-authored-by: abhishekbafna <abhishek.ba...@startree.ai>
---
 .../BaseSingleStageBrokerRequestHandler.java       |   7 +-
 .../pinot/common/config/provider/TableCache.java   | 198 ++++++++++++-
 .../pinot/common/metadata/ZKMetadataProvider.java  |  55 +++-
 .../pinot/common/utils/LogicalTableUtils.java      | 118 ++++++++
 .../api/resources/PinotLogicalTableResource.java   | 263 +++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  59 ++++
 .../PinotAdminUserLogicalTableResourceTest.java    |  51 ++++
 .../resources/PinotLogicalTableResourceTest.java   | 311 +++++++++++++++++++++
 ...inotUserWithAccessLogicalTableResourceTest.java | 173 ++++++++++++
 .../pinot/controller/helix/ControllerTest.java     |  52 ++++
 .../pinot/controller/helix/TableCacheTest.java     |  97 +++++++
 .../provider/LogicalTableConfigChangeListener.java |  31 ++
 .../spi/config/provider/PinotConfigProvider.java   |  16 ++
 .../apache/pinot/spi/data/LogicalTableConfig.java  | 118 ++++++++
 .../apache/pinot/spi/data/PhysicalTableConfig.java |  29 ++
 .../utils/builder/ControllerRequestURLBuilder.java |  20 ++
 .../utils/builder/LogicalTableConfigBuilder.java   |  53 ++++
 17 files changed, 1646 insertions(+), 5 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 97510585b8..3f923ba0ef 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -1178,10 +1178,11 @@ public abstract class 
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
   @VisibleForTesting
   static String getActualTableName(String tableName, TableCache tableCache) {
     String actualTableName = tableCache.getActualTableName(tableName);
-    if (actualTableName != null) {
-      return actualTableName;
+    // If actual table name is not found for physical table, check in the 
logical tables
+    if (actualTableName == null) {
+      actualTableName = tableCache.getActualLogicalTableName(tableName);
     }
-    return tableName;
+    return actualTableName != null ? actualTableName : tableName;
   }
 
   /**
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 17d953abec..726df76930 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -25,9 +25,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -37,8 +39,10 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.utils.LogicalTableUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
 import org.apache.pinot.spi.config.provider.PinotConfigProvider;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
 import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
@@ -46,6 +50,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
@@ -66,6 +71,8 @@ public class TableCache implements PinotConfigProvider {
   private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
   private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
   private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+  private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE";
+  private static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/";
   private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE";
   private static final String REALTIME_TABLE_SUFFIX = "_REALTIME";
   private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX = 
OFFLINE_TABLE_SUFFIX.toLowerCase();
@@ -74,6 +81,7 @@ public class TableCache implements PinotConfigProvider {
   // NOTE: No need to use concurrent set because it is always accessed within 
the ZK change listener lock
   private final Set<TableConfigChangeListener> _tableConfigChangeListeners = 
new HashSet<>();
   private final Set<SchemaChangeListener> _schemaChangeListeners = new 
HashSet<>();
+  private final Set<LogicalTableConfigChangeListener> 
_logicalTableConfigChangeListeners = new HashSet<>();
 
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final boolean _ignoreCase;
@@ -89,6 +97,14 @@ public class TableCache implements PinotConfigProvider {
   // Key is schema name, value is schema info
   private final Map<String, SchemaInfo> _schemaInfoMap = new 
ConcurrentHashMap<>();
 
+  private final ZkLogicalTableConfigChangeListener
+      _zkLogicalTableConfigChangeListener = new 
ZkLogicalTableConfigChangeListener();
+  // Key is table name, value is logical table info
+  private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap 
= new ConcurrentHashMap<>();
+  // Key is lower case logical table name, value is actual logical table name
+  // For case-insensitive mode only
+  private final Map<String, String> _logicalTableNameMap = new 
ConcurrentHashMap<>();
+
   public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean 
ignoreCase) {
     _propertyStore = propertyStore;
     _ignoreCase = ignoreCase;
@@ -121,6 +137,19 @@ public class TableCache implements PinotConfigProvider {
       }
     }
 
+    synchronized (_zkLogicalTableConfigChangeListener) {
+      // Subscribe child changes before reading the data to avoid missing 
changes
+      _propertyStore.subscribeChildChanges(LOGICAL_TABLE_PARENT_PATH, 
_zkLogicalTableConfigChangeListener);
+
+      List<String> tables = 
_propertyStore.getChildNames(LOGICAL_TABLE_PARENT_PATH, 
AccessOption.PERSISTENT);
+      if (CollectionUtils.isNotEmpty(tables)) {
+        List<String> pathsToAdd = tables.stream()
+            .map(rawTableName -> LOGICAL_TABLE_PATH_PREFIX + rawTableName)
+            .collect(Collectors.toList());
+        addLogicalTableConfigs(pathsToAdd);
+      }
+    }
+
     LOGGER.info("Initialized TableCache with IgnoreCase: {}", ignoreCase);
   }
 
@@ -144,6 +173,18 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  /**
+   * Returns the actual logical table name for the given table name, or {@code 
null} if table does not exist.
+   * @param logicalTableName Logical table name
+   * @return Actual logical table name
+   */
+  @Nullable
+  public String getActualLogicalTableName(String logicalTableName) {
+    return _ignoreCase
+        ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
+        : _logicalTableNameMap.get(logicalTableName);
+  }
+
   /**
    * Returns a map from table name to actual table name. For case-insensitive 
case, the keys of the map are in lower
    * case.
@@ -152,6 +193,15 @@ public class TableCache implements PinotConfigProvider {
     return _tableNameMap;
   }
 
+  /**
+   * Returns a map from logical table name to actual logical table name. For 
case-insensitive case, the keys of the map
+   * are in lower case.
+   * @return Map from logical table name to actual logical table name
+   */
+  public Map<String, String> getLogicalTableNameMap() {
+    return _logicalTableNameMap;
+  }
+
   /**
    * Get all dimension table names.
    * @return List of dimension table names
@@ -204,6 +254,14 @@ public class TableCache implements PinotConfigProvider {
     return tableConfigInfo != null ? tableConfigInfo._tableConfig : null;
   }
 
+  @Nullable
+  @Override
+  public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
+    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
+    LogicalTableConfigInfo logicalTableConfigInfo = 
_logicalTableConfigInfoMap.get(logicalTableName);
+    return logicalTableConfigInfo != null ? 
logicalTableConfigInfo._logicalTableConfig : null;
+  }
+
   @Override
   public boolean registerTableConfigChangeListener(TableConfigChangeListener 
tableConfigChangeListener) {
     synchronized (_zkTableConfigChangeListener) {
@@ -216,15 +274,33 @@ public class TableCache implements PinotConfigProvider {
   }
 
   /**
-   * Returns the schema for the given table, or {@code null} if it does not 
exist.
+   * Returns the schema for the given logical or physical table, or {@code 
null} if it does not exist.
    */
   @Nullable
   @Override
   public Schema getSchema(String rawTableName) {
+    if (_schemaInfoMap.containsKey(rawTableName)) {
+      return getPhysicalTableSchema(rawTableName);
+    } else {
+      return getLogicalTableSchema(rawTableName);
+    }
+  }
+
+  private Schema getPhysicalTableSchema(String rawTableName) {
     SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
     return schemaInfo != null ? schemaInfo._schema : null;
   }
 
+  @Nullable
+  private Schema getLogicalTableSchema(String logicalTableName) {
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(logicalTableName);
+    if (logicalTableConfig == null) {
+      return null;
+    }
+    Optional<String> physicalTableName = 
logicalTableConfig.getPhysicalTableConfigMap().keySet().stream().findFirst();
+    return 
getPhysicalTableSchema(TableNameBuilder.extractRawTableName(physicalTableName.orElse(null)));
+  }
+
   @Override
   public boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener) {
     synchronized (_zkSchemaChangeListener) {
@@ -253,6 +329,23 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  private void addLogicalTableConfigs(List<String> paths) {
+    // Subscribe data changes before reading the data to avoid missing changes
+    for (String path : paths) {
+      _propertyStore.subscribeDataChanges(path, 
_zkLogicalTableConfigChangeListener);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(paths, null, 
AccessOption.PERSISTENT);
+    for (ZNRecord znRecord : znRecords) {
+      if (znRecord != null) {
+        try {
+          putLogicalTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while adding logical table for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+      }
+    }
+  }
+
   private void putTableConfig(ZNRecord znRecord)
       throws IOException {
     TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
@@ -268,6 +361,19 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  private void putLogicalTableConfig(ZNRecord znRecord)
+      throws IOException {
+    LogicalTableConfig logicalTableConfig = 
LogicalTableUtils.fromZNRecord(znRecord);
+    String logicalTableName = logicalTableConfig.getTableName();
+    if (_ignoreCase) {
+      _logicalTableNameMap.put(logicalTableName.toLowerCase(), 
logicalTableName);
+      _logicalTableConfigInfoMap.put(logicalTableName.toLowerCase(), new 
LogicalTableConfigInfo(logicalTableConfig));
+    } else {
+      _logicalTableNameMap.put(logicalTableName, logicalTableName);
+      _logicalTableConfigInfoMap.put(logicalTableName, new 
LogicalTableConfigInfo(logicalTableConfig));
+    }
+  }
+
   private void removeTableConfig(String path) {
     _propertyStore.unsubscribeDataChanges(path, _zkTableConfigChangeListener);
     String tableNameWithType = 
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
@@ -299,6 +405,14 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  private void removeLogicalTableConfig(String path) {
+    _propertyStore.unsubscribeDataChanges(path, 
_zkLogicalTableConfigChangeListener);
+    String logicalTableName = 
path.substring(LOGICAL_TABLE_PATH_PREFIX.length());
+    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
+    _logicalTableConfigInfoMap.remove(logicalTableName);
+    _logicalTableNameMap.remove(logicalTableName);
+  }
+
   private void addSchemas(List<String> paths) {
     // Subscribe data changes before reading the data to avoid missing changes
     for (String path : paths) {
@@ -365,6 +479,15 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  private void notifyLogicalTableConfigChangeListeners() {
+    if (!_logicalTableConfigChangeListeners.isEmpty()) {
+      List<LogicalTableConfig> logicalTableConfigs = getLogicalTableConfigs();
+      for (LogicalTableConfigChangeListener listener : 
_logicalTableConfigChangeListeners) {
+        listener.onChange(logicalTableConfigs);
+      }
+    }
+  }
+
   private List<TableConfig> getTableConfigs() {
     List<TableConfig> tableConfigs = new 
ArrayList<>(_tableConfigInfoMap.size());
     for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
@@ -373,6 +496,10 @@ public class TableCache implements PinotConfigProvider {
     return tableConfigs;
   }
 
+  public List<LogicalTableConfig> getLogicalTableConfigs() {
+    return _logicalTableConfigInfoMap.values().stream().map(o -> 
o._logicalTableConfig).collect(Collectors.toList());
+  }
+
   private void notifySchemaChangeListeners() {
     if (!_schemaChangeListeners.isEmpty()) {
       List<Schema> schemas = getSchemas();
@@ -390,6 +517,23 @@ public class TableCache implements PinotConfigProvider {
     return schemas;
   }
 
+  public boolean isLogicalTable(String logicalTableName) {
+    logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : 
logicalTableName;
+    return _logicalTableConfigInfoMap.containsKey(logicalTableName);
+  }
+
+  @Override
+  public boolean registerLogicalTableConfigChangeListener(
+      LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
+    synchronized (_zkLogicalTableConfigChangeListener) {
+      boolean added = 
_logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener);
+      if (added) {
+        logicalTableConfigChangeListener.onChange(getLogicalTableConfigs());
+      }
+      return added;
+    }
+  }
+
   private class ZkTableConfigChangeListener implements IZkChildListener, 
IZkDataListener {
 
     @Override
@@ -476,6 +620,49 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  private class ZkLogicalTableConfigChangeListener implements 
IZkChildListener, IZkDataListener {
+
+    @Override
+    public synchronized void handleChildChange(String path, List<String> 
logicalTableNames) {
+      if (CollectionUtils.isEmpty(logicalTableNames)) {
+        return;
+      }
+
+      // Only process new added logical tables. Changed/removed logical tables 
are handled by other callbacks.
+      List<String> pathsToAdd = new ArrayList<>();
+      for (String logicalTableName : logicalTableNames) {
+        if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) {
+          pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+        }
+      }
+      if (!pathsToAdd.isEmpty()) {
+        addLogicalTableConfigs(pathsToAdd);
+      }
+      notifyLogicalTableConfigChangeListeners();
+    }
+
+    @Override
+    public synchronized void handleDataChange(String path, Object data) {
+      if (data != null) {
+        ZNRecord znRecord = (ZNRecord) data;
+        try {
+          putLogicalTableConfig(znRecord);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while refreshing logical table for 
ZNRecord: {}", znRecord.getId(), e);
+        }
+        notifyLogicalTableConfigChangeListeners();
+      }
+    }
+
+    @Override
+    public synchronized void handleDataDeleted(String path) {
+      // NOTE: The path here is the absolute ZK path instead of the relative 
path to the property store.
+      String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
+      removeLogicalTableConfig(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+      notifyLogicalTableConfigChangeListeners();
+    }
+  }
+
   private static class TableConfigInfo {
     final TableConfig _tableConfig;
     final Map<Expression, Expression> _expressionOverrideMap;
@@ -522,4 +709,13 @@ public class TableCache implements PinotConfigProvider {
       _columnNameMap = columnNameMap;
     }
   }
+
+  private static class LogicalTableConfigInfo {
+    final LogicalTableConfig _logicalTableConfig;
+    // TODO : Add expression override map for logical table, issue #15607
+
+    private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
+      _logicalTableConfig = logicalTableConfig;
+    }
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 096de78884..3025b1f5d9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pinot.common.metadata;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -37,6 +40,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.LogicalTableUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -45,6 +49,7 @@ import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -67,6 +72,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
   private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX = 
"/PAUSELESS_DEBUG_METADATA";
   private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
+  private static final String PROPERTYSTORE_LOGICAL_PREFIX = "/LOGICAL/TABLE";
   private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = 
"/INSTANCE_PARTITIONS";
   private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = 
"/CONFIGS/DATABASE";
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = 
"/CONFIGS/TABLE";
@@ -304,6 +310,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
taskType, tableNameWithType);
   }
 
+  public static String constructPropertyStorePathForLogical(String tableName) {
+    return StringUtil.join("/", PROPERTYSTORE_LOGICAL_PREFIX, tableName);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String resourceNameForResource,
       String segmentName) {
     return 
propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource,
 segmentName),
@@ -376,6 +386,7 @@ public class ZKMetadataProvider {
     return 
propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType, 
segmentName),
         AccessOption.PERSISTENT);
   }
+
   public static boolean 
removePauselessDebugMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
       String tableNameWithType) {
     String pauselessDebugMetadataPath = 
constructPropertyStorePathForPauselessDebugMetadata(tableNameWithType);
@@ -385,7 +396,6 @@ public class ZKMetadataProvider {
     return true;
   }
 
-
   @Nullable
   public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String path) {
     Stat stat = new Stat();
@@ -809,4 +819,47 @@ public class ZKMetadataProvider {
       return result;
     }
   }
+
+  public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
+      LogicalTableConfig logicalTableConfig) {
+    try {
+      ZNRecord znRecord = LogicalTableUtils.toZNRecord(logicalTableConfig);
+      String path = 
constructPropertyStorePathForLogical(logicalTableConfig.getTableName());
+      propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to convert logical table to 
ZNRecord", e);
+    }
+  }
+
+  public static List<LogicalTableConfig> 
getAllLogicalTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    List<ZNRecord> znRecords =
+        propertyStore.getChildren(PROPERTYSTORE_LOGICAL_PREFIX, null, 
AccessOption.PERSISTENT, 0, 0);
+    if (znRecords != null) {
+      return znRecords.stream().map(znRecord -> {
+        try {
+          return LogicalTableUtils.fromZNRecord(znRecord);
+        } catch (IOException e) {
+          LOGGER.error("Caught exception while converting ZNRecord to 
LogicalTable: {}", znRecord.getId(), e);
+          return null;
+        }
+      }).filter(Objects::nonNull).collect(Collectors.toList());
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  public static LogicalTableConfig 
getLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableName) {
+    try {
+      ZNRecord logicalTableZNRecord =
+          propertyStore.get(constructPropertyStorePathForLogical(tableName), 
null, AccessOption.PERSISTENT);
+      if (logicalTableZNRecord == null) {
+        return null;
+      }
+      return LogicalTableUtils.fromZNRecord(logicalTableZNRecord);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while getting logical table: {}", 
tableName, e);
+      return null;
+    }
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
new file mode 100644
index 0000000000..0b5d715bd9
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+public class LogicalTableUtils {
+
+  private LogicalTableUtils() {
+    // Utility class
+  }
+
+  public static LogicalTableConfig fromZNRecord(ZNRecord record)
+      throws IOException {
+    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+        
.setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY))
+        
.setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
+
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : 
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
+        .entrySet()) {
+      String physicalTableName = entry.getKey();
+      String physicalTableConfigJson = entry.getValue();
+      physicalTableConfigMap.put(physicalTableName,
+          JsonUtils.stringToObject(physicalTableConfigJson, 
PhysicalTableConfig.class));
+    }
+    builder.setPhysicalTableConfigMap(physicalTableConfigMap);
+    return builder.build();
+  }
+
+  public static ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig)
+      throws JsonProcessingException {
+    Map<String, String> physicalTableConfigMap = new HashMap<>();
+    for (Map.Entry<String, PhysicalTableConfig> entry : 
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+      String physicalTableName = entry.getKey();
+      PhysicalTableConfig physicalTableConfig = entry.getValue();
+      physicalTableConfigMap.put(physicalTableName, 
physicalTableConfig.toJsonString());
+    }
+
+    ZNRecord record = new ZNRecord(logicalTableConfig.getTableName());
+    record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY, 
logicalTableConfig.getTableName());
+    record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY, 
logicalTableConfig.getBrokerTenant());
+    record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY, 
physicalTableConfigMap);
+    return record;
+  }
+
+  public static void validateLogicalTableName(LogicalTableConfig 
logicalTableConfig, List<String> allPhysicalTables,
+      Set<String> allBrokerTenantNames) {
+    String tableName = logicalTableConfig.getTableName();
+    if (StringUtils.isEmpty(tableName)) {
+      throw new IllegalArgumentException("Invalid logical table name. Reason: 
'tableName' should not be null or empty");
+    }
+
+    if (TableNameBuilder.isOfflineTableResource(tableName) || 
TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new IllegalArgumentException(
+          "Invalid logical table name. Reason: 'tableName' should not end with 
_OFFLINE or _REALTIME");
+    }
+
+    if (logicalTableConfig.getPhysicalTableConfigMap() == null || 
logicalTableConfig.getPhysicalTableConfigMap()
+        .isEmpty()) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: 'physicalTableConfigMap' should not 
be null or empty");
+    }
+
+    for (Map.Entry<String, PhysicalTableConfig> entry : 
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+      String physicalTableName = entry.getKey();
+      PhysicalTableConfig physicalTableConfig = entry.getValue();
+
+      // validate physical table exists
+      if (!allPhysicalTables.contains(physicalTableName)) {
+        throw new IllegalArgumentException(
+            "Invalid logical table. Reason: '" + physicalTableName + "' should 
be one of the existing tables");
+      }
+      // validate physical table config is not null
+      if (physicalTableConfig == null) {
+        throw new IllegalArgumentException(
+            "Invalid logical table. Reason: 'physicalTableConfig' should not 
be null for physical table: "
+                + physicalTableName);
+      }
+    }
+
+    // validate broker tenant
+    String brokerTenant = logicalTableConfig.getBrokerTenant();
+    if (!allBrokerTenantNames.contains(brokerTenant)) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: '" + brokerTenant + "' should be one 
of the existing broker tenants");
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
new file mode 100644
index 0000000000..2e65ab9378
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.arrow.util.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.common.utils.LogicalTableUtils;
+import org.apache.pinot.controller.api.access.AccessControlFactory;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.glassfish.grizzly.http.server.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+@Api(tags = "LogicalTable", authorizations = {
+    @Authorization(value = SWAGGER_AUTHORIZATION_KEY), @Authorization(value = 
DATABASE)
+})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
+    @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+        key = SWAGGER_AUTHORIZATION_KEY,
+        description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
+    @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
+        description = "Database context passed through http header. If no 
context is provided 'default' "
+            + "database context will be considered.")
+}))
+@Path("/")
+public class PinotLogicalTableResource {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotLogicalTableResource.class);
+  private static final String DEFAULT_BROKER_TENANT = "DefaultTenant";
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/logicalTables")
+  @Authorize(targetType = TargetType.CLUSTER, paramName = "tableName", action 
= Actions.Cluster.GET_TABLE)
+  @ApiOperation(value = "List all logical table names", notes = "Lists all 
logical table names")
+  public List<String> listLogicalTableNames(@Context HttpHeaders headers) {
+    return _pinotHelixResourceManager.getAllLogicalTableNames();
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/logicalTables/{tableName}")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_TABLE_CONFIG)
+  @ApiOperation(value = "Get a logical table", notes = "Gets a logical table 
by name")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, 
message = "Logical table not found"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  public String getLogicalTable(
+      @ApiParam(value = "Logical table name", required = true) 
@PathParam("tableName") String tableName,
+      @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    LOGGER.info("Looking for logical table {}", tableName);
+    LogicalTableConfig logicalTableConfig = 
_pinotHelixResourceManager.getLogicalTableConfig(tableName);
+    if (logicalTableConfig == null) {
+      throw new ControllerApplicationException(LOGGER, "Logical table not 
found", Response.Status.NOT_FOUND);
+    }
+    return logicalTableConfig.toPrettyJsonString();
+  }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/logicalTables")
+  @ApiOperation(value = "Add a new logical table", notes = "Adds a new logical 
table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully created logical 
table"), @ApiResponse(code = 409, message =
+      "Logical table already exists"), @ApiResponse(code = 400, message = 
"Missing or invalid request body"),
+      @ApiResponse(code = 500, message = "Internal error")
+  })
+  @ManualAuthorization
+  public SuccessResponse addLogicalTable(
+      String logicalTableJsonString, @Context HttpHeaders httpHeaders,
+      @Context Request request) {
+    Pair<LogicalTableConfig, Map<String, Object>> 
logicalTableConfigAndUnrecognizedProps =
+        getLogicalAndUnrecognizedPropertiesFromJson(logicalTableJsonString);
+    LogicalTableConfig logicalTableConfig = 
logicalTableConfigAndUnrecognizedProps.getLeft();
+    String tableName = 
DatabaseUtils.translateTableName(logicalTableConfig.getTableName(), 
httpHeaders);
+    logicalTableConfig.setTableName(tableName);
+
+    // validate permission
+    ResourceUtils.checkPermissionAndAccess(tableName, request, httpHeaders, 
AccessType.CREATE,
+        Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER);
+
+    SuccessResponse successResponse = addLogicalTable(logicalTableConfig);
+    return new ConfigSuccessResponse(successResponse.getStatus(), 
logicalTableConfigAndUnrecognizedProps.getRight());
+  }
+
+  @PUT
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/logicalTables/{tableName}")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.UPDATE_TABLE_CONFIG)
+  @Authenticate(AccessType.UPDATE)
+  @ApiOperation(value = "Update a logical table", notes = "Updates a logical 
table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully updated schema"), 
@ApiResponse(code = 404, message = "Schema "
+      + "not found"), @ApiResponse(code = 400, message = "Missing or invalid 
request body"), @ApiResponse(code = 500,
+      message = "Internal error")
+  })
+  public SuccessResponse updateLogicalTable(
+      @ApiParam(value = "Name of the logical table", required = true) 
@PathParam("tableName") String tableName,
+      @Context HttpHeaders headers, String logicalTableJsonString) {
+    Pair<LogicalTableConfig, Map<String, Object>> 
logicalTableConfigAndUnrecognizedProps =
+        getLogicalAndUnrecognizedPropertiesFromJson(logicalTableJsonString);
+    LogicalTableConfig logicalTableConfig = 
logicalTableConfigAndUnrecognizedProps.getLeft();
+
+    
Preconditions.checkArgument(logicalTableConfig.getTableName().equals(tableName),
+        "Logical table name in the request body should match the table name in 
the URL");
+
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    logicalTableConfig.setTableName(tableName);
+
+    SuccessResponse successResponse = updateLogicalTable(tableName, 
logicalTableConfig);
+    return new ConfigSuccessResponse(successResponse.getStatus(), 
logicalTableConfigAndUnrecognizedProps.getRight());
+  }
+
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/logicalTables/{tableName}")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.DELETE_TABLE)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Delete a logical table", notes = "Deletes a logical 
table by name")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully deleted logical 
table"), @ApiResponse(code = 404, message =
+      "Logical table not found"), @ApiResponse(code = 500, message = "Error 
deleting logical table")
+  })
+  public SuccessResponse deleteLogicalTable(
+      @ApiParam(value = "Logical table name", required = true) 
@PathParam("tableName") String tableName,
+      @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    if (_pinotHelixResourceManager.deleteLogicalTable(tableName)) {
+      return new SuccessResponse(tableName + " logical table successfully 
deleted.");
+    } else {
+      throw new ControllerApplicationException(LOGGER, "Failed to delete 
logical table",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private Pair<LogicalTableConfig, Map<String, Object>> 
getLogicalAndUnrecognizedPropertiesFromJson(
+      String logicalTableConfigJsonString)
+      throws ControllerApplicationException {
+    try {
+      return 
JsonUtils.stringToObjectAndUnrecognizedProperties(logicalTableConfigJsonString, 
LogicalTableConfig.class);
+    } catch (Exception e) {
+      String msg =
+          String.format("Invalid logical table json config: %s. Reason: %s", 
logicalTableConfigJsonString,
+              e.getMessage());
+      throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST, e);
+    }
+  }
+
+  private SuccessResponse addLogicalTable(LogicalTableConfig 
logicalTableConfig) {
+    String tableName = logicalTableConfig.getTableName();
+    try {
+      if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+        logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
+      }
+
+      LogicalTableUtils.validateLogicalTableName(
+          logicalTableConfig,
+          _pinotHelixResourceManager.getAllTables(),
+          _pinotHelixResourceManager.getAllBrokerTenantNames()
+      );
+      _pinotHelixResourceManager.addLogicalTable(logicalTableConfig);
+      return new SuccessResponse(tableName + " logical table successfully 
added.");
+    } catch (TableAlreadyExistsException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.CONFLICT, e);
+    } catch (IllegalArgumentException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST, e);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to add new logical table " + tableName + ". Reason: " + 
e.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  private SuccessResponse updateLogicalTable(String tableName, 
LogicalTableConfig logicalTableConfig) {
+    try {
+      if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+        logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
+      }
+
+      LogicalTableUtils.validateLogicalTableName(
+          logicalTableConfig,
+          _pinotHelixResourceManager.getAllTables(),
+          _pinotHelixResourceManager.getAllBrokerTenantNames()
+      );
+      _pinotHelixResourceManager.updateLogicalTable(logicalTableConfig);
+      return new SuccessResponse(logicalTableConfig.getTableName() + " logical 
table successfully updated.");
+    } catch (TableNotFoundException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find logical 
table " + tableName,
+          Response.Status.NOT_FOUND, e);
+    } catch (IllegalArgumentException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST, e);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to update logical table " + tableName + ". Reason: " + 
e.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 830fd88055..9bdfb8a254 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -176,6 +176,7 @@ import org.apache.pinot.spi.config.user.ComponentType;
 import org.apache.pinot.spi.config.user.RoleType;
 import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -2205,6 +2206,64 @@ public class PinotHelixResourceManager {
     }
   }
 
+  public void addLogicalTable(LogicalTableConfig logicalTableConfig)
+      throws TableAlreadyExistsException {
+    String tableName = logicalTableConfig.getTableName();
+    LOGGER.info("Adding logical table: {}", tableName);
+
+    // Check if the logical table name is already used
+    LogicalTableConfig existingLogicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    if (existingLogicalTableConfig != null) {
+      throw new TableAlreadyExistsException("Logical table: " + tableName + " 
already exists");
+    }
+
+    // Check if the table name is already used by a physical table
+    
getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals)
+        .findFirst().ifPresent(tableNameWithType -> {
+          throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
+        });
+
+    ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
+    LOGGER.info("Added logical table: {}", tableName);
+  }
+
+  public void updateLogicalTable(LogicalTableConfig logicalTableConfig)
+      throws TableNotFoundException {
+    String tableName = logicalTableConfig.getTableName();
+    LOGGER.info("Updating logical table: {}", tableName);
+
+    LogicalTableConfig oldLogicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    if (oldLogicalTableConfig == null) {
+      throw new TableNotFoundException("Logical table: " + tableName + " does 
not exist");
+    }
+
+    ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
+    LOGGER.info("Updated logical table: {}", tableName);
+  }
+
+  public boolean deleteLogicalTable(String tableName) {
+    LOGGER.info("Deleting logical table: {}", tableName);
+    boolean result = false;
+    String propertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForLogical(tableName);
+    if (_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
+      result = _propertyStore.remove(propertyStorePath, 
AccessOption.PERSISTENT);
+    } else {
+      throw new ControllerApplicationException(LOGGER,
+          "Logical table: " + tableName + " does not exists.", 
Response.Status.NOT_FOUND);
+    }
+    LOGGER.info("Deleted logical table: {}", tableName);
+    return result;
+  }
+
+  public LogicalTableConfig getLogicalTableConfig(String tableName) {
+    return ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+  }
+
+  public List<String> getAllLogicalTableNames() {
+    return 
ZKMetadataProvider.getAllLogicalTableConfigs(_propertyStore).stream().map(LogicalTableConfig::getTableName)
+        .collect(Collectors.toList());
+  }
+
   /**
    * Returns the ZK metdata for the given jobId and jobType
    * @param jobId the id of the job
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java
new file mode 100644
index 0000000000..573e2984ef
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+
+
+public class PinotAdminUserLogicalTableResourceTest extends 
PinotLogicalTableResourceTest {
+
+  public static final String AUTH_TOKEN = "Basic YWRtaW46dmVyeXNlY3JldA=====";
+  public static final Map<String, String> AUTH_HEADER = 
Map.of("Authorization", AUTH_TOKEN);
+
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    properties.put("controller.admin.access.control.factory.class",
+        
"org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory");
+    properties.put("controller.admin.access.control.principals", "admin");
+    
properties.put("controller.admin.access.control.principals.admin.password", 
"verysecret");
+  }
+
+  @Override
+  protected Map<String, String> getHeaders() {
+    return AUTH_HEADER;
+  }
+
+  @Override
+  public ControllerRequestClient getControllerRequestClient() {
+    if (_controllerRequestClient == null) {
+      _controllerRequestClient =
+          new ControllerRequestClient(_controllerRequestURLBuilder, 
getHttpClient(), AUTH_HEADER);
+    }
+    return _controllerRequestClient;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
new file mode 100644
index 0000000000..1f2d3a8204
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+public class PinotLogicalTableResourceTest extends ControllerTest {
+
+  private static final String LOGICAL_TABLE_NAME = "test_logical_table";
+  public static final String BROKER_TENANT = "DefaultTenant";
+  protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
+
+  @BeforeClass
+  public void setUpClass()
+      throws Exception {
+    startZk();
+    startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    _controllerRequestURLBuilder = getControllerRequestURLBuilder();
+  }
+
+  @AfterClass
+  public void tearDownClass() {
+    stopController();
+    stopZk();
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    // cleans up the physical tables after each testcase
+    cleanup();
+  }
+
+  @DataProvider
+  public Object[][] tableNamesProvider() {
+    return new Object[][]{
+        {"test_logical_table", List.of("test_table_1", "test_table_2"), 
List.of("test_table_3")},
+        {"test_logical_table", List.of("test_table_1", "db.test_table_2"), 
List.of("test_table_3")},
+        {"test_logical_table", List.of("test_table_1", "test_table_2"), 
List.of("db.test_table_3")},
+        {"test_logical_table", List.of("db.test_table_1", "db.test_table_2"), 
List.of("db.test_table_3")},
+        {"test_table", List.of("db1.test_table", "db2.test_table"), 
List.of("db3.test_table")},
+        {"db0.test_table", List.of("db1.test_table", "db2.test_table"), 
List.of("db3.test_table")},
+        {"db.test_logical_table", List.of("test_table_1", "test_table_2"), 
List.of("test_table_3")},
+        {"db.test_logical_table", List.of("test_table_1", "db.test_table_2"), 
List.of("test_table_3")},
+        {"db.test_logical_table", List.of("test_table_1", "test_table_2"), 
List.of("db.test_table_3")},
+        {"db.test_logical_table", List.of("db.test_table_1", 
"db.test_table_2"), List.of("db.test_table_3")},
+    };
+  }
+
+  @Test(dataProvider = "tableNamesProvider")
+  public void testCreateUpdateDeleteLogicalTables(String logicalTableName, 
List<String> physicalTableNames,
+      List<String> physicalTablesToUpdate)
+      throws IOException {
+    // verify logical table does not exist
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
+    String updateLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
+    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
+
+    // verify logical table does not exist
+    verifyLogicalTableDoesNotExists(getLogicalTableUrl);
+
+    // setup physical and logical tables
+    List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
+    LogicalTableConfig
+        logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, 
physicalTableNamesWithType, BROKER_TENANT);
+
+    // create logical table
+    String resp =
+        ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    assertEquals(resp,
+        "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " 
logical table successfully added.\"}");
+
+    // verify logical table
+    verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+
+    // update logical table and setup new physical tables
+    List<String> tableNameToUpdateWithType = 
createHybridTables(physicalTablesToUpdate);
+    tableNameToUpdateWithType.addAll(physicalTableNamesWithType);
+    logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, 
tableNameToUpdateWithType, BROKER_TENANT);
+
+    String response =
+        ControllerTest.sendPutRequest(updateLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    assertEquals(response,
+        "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " 
logical table successfully updated.\"}");
+
+    // verify updated logical table
+    verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+
+    // delete logical table
+    String deleteResponse = 
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+    assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + " 
logical table successfully deleted.\"}");
+
+    // verify logical table is deleted
+    verifyLogicalTableDoesNotExists(getLogicalTableUrl);
+  }
+
+  @Test
+  public void testLogicalTableValidationTests()
+      throws IOException {
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+
+    // create physical tables
+    List<String> physicalTableNames = List.of("test_table_1", "test_table_2");
+    List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
+
+    // Test logical table name with _OFFLINE and _REALTIME is not allowed
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig("testLogicalTable_OFFLINE", 
physicalTableNamesWithType, BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Reason: 'tableName' should not end 
with _OFFLINE or _REALTIME"),
+          e.getMessage());
+    }
+
+    logicalTableConfig =
+        getDummyLogicalTableConfig("testLogicalTable_REALTIME", 
physicalTableNamesWithType, BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Reason: 'tableName' should not end 
with _OFFLINE or _REALTIME"),
+          e.getMessage());
+    }
+
+    // Test logical table name can not be same as existing physical table name
+    logicalTableConfig =
+        getDummyLogicalTableConfig("test_table_1", physicalTableNamesWithType, 
BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Table name: test_table_1 already 
exists"), e.getMessage());
+    }
+
+    // Test empty physical table names is not allowed
+    logicalTableConfig =
+        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), 
BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("'physicalTableConfigMap' should not 
be null or empty"), e.getMessage());
+    }
+
+    // Test all table names are physical table names and none is hybrid table 
name
+    logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNames, BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Reason: 'test_table_1' should be one 
of the existing tables"),
+          e.getMessage());
+    }
+
+    // Test valid broker tenant is provided
+    logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, "InvalidTenant");
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Reason: 'InvalidTenant' should be 
one of the existing broker tenants"),
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testLogicalTableWithSameNameNotAllowed()
+      throws IOException {
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_1", "test_table_2"));
+
+    LogicalTableConfig
+        logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+    ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+    try {
+      // create the same logical table again
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Logical table: test_logical_table 
already exists"), e.getMessage());
+    }
+
+    // clean up the logical table
+    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
+    ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+    verifyLogicalTableDoesNotExists(getLogicalTableUrl);
+  }
+
+  @DataProvider
+  public Object[][] physicalTableShouldExistProvider() {
+    return new Object[][]{
+        {LOGICAL_TABLE_NAME, List.of("test_table_1"), "unknown_table_OFFLINE"},
+        {LOGICAL_TABLE_NAME, List.of("test_table_2"), 
"unknown_table_REALTIME"},
+        {LOGICAL_TABLE_NAME, List.of("test_table_1"), 
"db.test_table_1_OFFLINE"},
+        {LOGICAL_TABLE_NAME, List.of("test_table_2"), 
"db.test_table_2_REALTIME"},
+    };
+  }
+
+  @Test(dataProvider = "physicalTableShouldExistProvider")
+  public void testPhysicalTableShouldExist(String logicalTableName, 
List<String> physicalTableNames,
+      String unknownTableName)
+      throws IOException {
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+
+    // setup physical tables
+    List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
+    physicalTableNamesWithType.add(unknownTableName);
+
+    // Test physical table should exist
+    LogicalTableConfig
+        logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, 
physicalTableNamesWithType, BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("'" + unknownTableName + "' should be 
one of the existing tables"),
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testGetLogicalTableNames()
+      throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+    String getLogicalTableNamesUrl = 
_controllerRequestURLBuilder.forLogicalTableNamesGet();
+    String response = ControllerTest.sendGetRequest(getLogicalTableNamesUrl, 
getHeaders());
+    assertEquals(response, objectMapper.writeValueAsString(List.of()));
+
+    // setup physical tables and logical tables
+    List<String> logicalTableNames = List.of("db.test_logical_table_1", 
"test_logical_table_2", "test_logical_table_3");
+    List<String> physicalTableNames = List.of("test_table_1", "test_table_2", 
"db.test_table_3");
+    List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
+
+    for (int i = 0; i < logicalTableNames.size(); i++) {
+      LogicalTableConfig logicalTableConfig = 
getDummyLogicalTableConfig(logicalTableNames.get(i), List.of(
+          physicalTableNamesWithType.get(2 * i), 
physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT);
+
+      
ControllerTest.sendPostRequest(_controllerRequestURLBuilder.forLogicalTableCreate(),
+          logicalTableConfig.toSingleLineJsonString(), getHeaders());
+    }
+
+    // verify logical table names
+    String getLogicalTableNamesResponse = 
ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders());
+    assertEquals(getLogicalTableNamesResponse, 
objectMapper.writeValueAsString(logicalTableNames));
+
+    // cleanup: delete logical tables
+    for (String logicalTableName : logicalTableNames) {
+      String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
+      String deleteResponse = 
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+      assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + " 
logical table successfully deleted.\"}");
+    }
+  }
+
+  private void verifyLogicalTableExists(String getLogicalTableUrl, 
LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    LogicalTableConfig remoteLogicalTableConfig =
+        
LogicalTableConfig.fromString(ControllerTest.sendGetRequest(getLogicalTableUrl, 
getHeaders()));
+    assertEquals(remoteLogicalTableConfig, logicalTableConfig);
+  }
+
+  private void verifyLogicalTableDoesNotExists(String getLogicalTableUrl) {
+    try {
+      ControllerTest.sendGetRequest(getLogicalTableUrl, getHeaders());
+      fail("Logical Table GET request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Logical table not found"), 
e.getMessage());
+    }
+  }
+
+  protected Map<String, String> getHeaders() {
+    return Map.of();
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
new file mode 100644
index 0000000000..b8f0473267
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+public class PinotUserWithAccessLogicalTableResourceTest extends 
ControllerTest {
+
+  public static final String AUTH_TOKEN = "Basic YWRtaW46dmVyeXNlY3JldA=====";
+  public static final String AUTH_TOKEN_USER = "Basic dXNlcjpzZWNyZXQ==";
+  public static final Map<String, String> AUTH_HEADER = 
Map.of("Authorization", AUTH_TOKEN);
+  public static final Map<String, String> AUTH_HEADER_USER = 
Map.of("Authorization", AUTH_TOKEN_USER);
+  public static final String LOGICAL_TABLE_NAME = "test_logical_table";
+
+  private Map<String, Object> getControllerConf(Object permissions) {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put("controller.admin.access.control.factory.class",
+        
"org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory");
+    properties.put("controller.admin.access.control.principals", "admin,user");
+    
properties.put("controller.admin.access.control.principals.admin.password", 
"verysecret");
+    properties.put("controller.admin.access.control.principals.user.password", 
"secret");
+    
properties.put("controller.admin.access.control.principals.user.permissions", 
permissions);
+    return properties;
+  }
+
+  protected Map<String, String> getHeaders() {
+    return AUTH_HEADER_USER;
+  }
+
+  @Override
+  public ControllerRequestClient getControllerRequestClient() {
+    if (_controllerRequestClient == null) {
+      _controllerRequestClient =
+          new ControllerRequestClient(_controllerRequestURLBuilder, 
getHttpClient(), AUTH_HEADER);
+    }
+    return _controllerRequestClient;
+  }
+
+  private void setup(Map<String, Object> properties)
+      throws Exception {
+    startZk();
+    Map<String, Object> configuration = getDefaultControllerConfiguration();
+    configuration.putAll(properties);
+    startController(configuration);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    _controllerRequestURLBuilder = getControllerRequestURLBuilder();
+  }
+
+  @AfterMethod
+  private void tearDown() {
+    cleanup();
+    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
+    try {
+      ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, AUTH_HEADER);
+    } catch (Exception e) {
+      // ignore
+    }
+    stopController();
+    stopZk();
+  }
+
+  @DataProvider
+  public Object[][] permissionsProvider() {
+    return new Object[][]{
+        {"read,create"},
+        {"read,create,update"},
+        {"read,create,update,delete"}
+    };
+  }
+
+  @Test(dataProvider = "permissionsProvider")
+  public void testUserWithCreateAccess(String permissions)
+      throws Exception {
+    Map<String, Object> properties = getControllerConf(permissions);
+
+    setup(properties);
+
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
+    String updateLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableUpdate(LOGICAL_TABLE_NAME);
+    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
+
+    List<String> physicalTableNames = List.of("test_table_1");
+    List<String> physicalTablesWithType = 
createHybridTables(physicalTableNames);
+    LogicalTableConfig logicalTableConfig;
+
+    // create logical table
+    try {
+      logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTablesWithType, "DefaultTenant");
+      String resp =
+          ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      if (permissions.contains("create")) {
+        assertEquals(resp,
+            "{\"unrecognizedProperties\":{},\"status\":\"" + LOGICAL_TABLE_NAME
+                + " logical table successfully added.\"}");
+        verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+      } else {
+        fail("Logical Table POST request should have failed");
+      }
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Permission is denied for CREATE"), 
e.getMessage());
+    }
+
+    // update logical table
+    try {
+      
physicalTablesWithType.addAll(createHybridTables(List.of("test_table_2")));
+      logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTablesWithType, "DefaultTenant");
+      String respUpdate = ControllerTest.sendPutRequest(
+          updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), 
getHeaders()
+      );
+      if (permissions.contains("update")) {
+        assertEquals(respUpdate,
+            "{\"unrecognizedProperties\":{},\"status\":\"" + LOGICAL_TABLE_NAME
+                + " logical table successfully updated.\"}");
+        verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+      } else {
+        fail("Logical Table POST request should have failed");
+      }
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Permission is denied for UPDATE"), 
e.getMessage());
+    }
+
+    // delete logical table
+    try {
+      String respDelete = 
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+      if (permissions.contains("delete")) {
+        assertEquals(respDelete, "{\"status\":\"" + LOGICAL_TABLE_NAME + " 
logical table successfully deleted.\"}");
+      } else {
+        fail("Logical Table DELETE request should have failed");
+      }
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Permission is denied for DELETE"), 
e.getMessage());
+    }
+  }
+
+  private void verifyLogicalTableExists(String logicalTableNamesGet, 
LogicalTableConfig logicalTableConfig)
+      throws IOException {
+    String respGet = ControllerTest.sendGetRequest(logicalTableNamesGet, 
getHeaders());
+    LogicalTableConfig remoteTable = LogicalTableConfig.fromString(respGet);
+    assertEquals(remoteTable, logicalTableConfig);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index a5b05031c6..effcf823b7 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -69,12 +69,15 @@ import 
org.apache.pinot.controller.api.access.AllowAllAccessFactory;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.api.resources.TableViews;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -82,6 +85,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.MockedStatic;
@@ -159,6 +164,21 @@ public class ControllerTest {
     return DEFAULT_INSTANCE;
   }
 
+  public List<String> createHybridTables(List<String> tableNames)
+      throws IOException {
+    List<String> tableNamesWithType = new ArrayList<>();
+    for (String tableName : tableNames) {
+      addDummySchema(tableName);
+      TableConfig offlineTable = createDummyTableConfig(tableName, 
TableType.OFFLINE);
+      TableConfig realtimeTable = createDummyTableConfig(tableName, 
TableType.REALTIME);
+      addTableConfig(offlineTable);
+      addTableConfig(realtimeTable);
+      tableNamesWithType.add(offlineTable.getTableName());
+      tableNamesWithType.add(realtimeTable.getTableName());
+    }
+    return tableNamesWithType;
+  }
+
   public String getHelixClusterName() {
     return _clusterName;
   }
@@ -369,6 +389,19 @@ public class ControllerTest {
     }
   }
 
+  public static LogicalTableConfig getDummyLogicalTableConfig(String 
tableName, List<String> physicalTableNames,
+      String brokerTenant) {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    for (String physicalTableName : physicalTableNames) {
+      physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+    }
+    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+        .setTableName(tableName)
+        .setBrokerTenant(brokerTenant)
+        .setPhysicalTableConfigMap(physicalTableConfigMap);
+    return builder.build();
+  }
+
   public static class FakeBrokerResourceOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
     private static final String STATE_MODEL_DEF = 
"BrokerResourceOnlineOfflineStateModel";
 
@@ -649,6 +682,19 @@ public class ControllerTest {
     return schema;
   }
 
+  public static TableConfig createDummyTableConfig(String tableName, TableType 
tableType) {
+    TableConfigBuilder builder = new TableConfigBuilder(tableType);
+    if (tableType == TableType.REALTIME) {
+      
builder.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap());
+    }
+    return builder.setTableName(tableName)
+        .setTimeColumnName("timeColumn")
+        .setTimeType("DAYS")
+        .setRetentionTimeUnit("DAYS")
+        .setRetentionTimeValue("5")
+        .build();
+  }
+
   public static Schema createDummySchemaWithPrimaryKey(String tableName) {
     Schema schema = createDummySchema(tableName);
     schema.setPrimaryKeyColumns(Collections.singletonList("dimA"));
@@ -1184,6 +1230,12 @@ public class ControllerTest {
    * test functionality.
    */
   public void cleanup() {
+    // Delete logical tables
+    List<String> logicalTables = 
_helixResourceManager.getAllLogicalTableNames();
+    for (String logicalTableName : logicalTables) {
+      _helixResourceManager.deleteLogicalTable(logicalTableName);
+    }
+
     // Delete all tables
     List<String> tables = _helixResourceManager.getAllTables();
     for (String tableNameWithType : tables) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 56fe2464cf..dd5e217417 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -22,14 +22,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
 import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -44,10 +48,14 @@ import static org.testng.Assert.*;
 public class TableCacheTest {
   private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
   private static final String RAW_TABLE_NAME = "cacheTestTable";
+  private static final String ANOTHER_TABLE = "anotherTable";
+  private static final String LOGICAL_TABLE_NAME = "cacheLogicalTestTable";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final String ANOTHER_TABLE_OFFLINE = 
TableNameBuilder.OFFLINE.tableNameWithType(ANOTHER_TABLE);
 
   private static final String MANGLED_RAW_TABLE_NAME = "cAcHeTeStTaBlE";
+  private static final String MANGLED_LOGICAL_TABLE_NAME = 
"cAcHeLoGiCaLTeStTaBlE";
   private static final String MANGLED_OFFLINE_TABLE_NAME = 
MANGLED_RAW_TABLE_NAME + "_oFfLiNe";
 
   @BeforeClass
@@ -65,6 +73,7 @@ public class TableCacheTest {
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
     assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
     assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
+    assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME));
 
     // Add a schema
     Schema schema =
@@ -107,18 +116,43 @@ public class TableCacheTest {
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
 
+    // Add logical table
+    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME));
+    
TEST_INSTANCE.getHelixResourceManager().addLogicalTable(logicalTableConfig);
+    // Wait for at most 10 seconds for the callback to add the logical table 
to the cache
+    TestUtils.waitForCondition(aVoid -> 
tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L,
+        "Failed to add the logical table to the cache");
+    // Logical table can be accessed by the logical table name
+    if (isCaseInsensitive) {
+      
assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), 
LOGICAL_TABLE_NAME);
+      
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
+      assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), 
expectedSchema);
+    } else {
+      
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
+    }
+    assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
+    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+
     // Register the change listeners
     TestTableConfigChangeListener tableConfigChangeListener = new 
TestTableConfigChangeListener();
     
assertTrue(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
     assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
     assertEquals(tableConfigChangeListener._tableConfigList.get(0), 
tableConfig);
+
     TestSchemaChangeListener schemaChangeListener = new 
TestSchemaChangeListener();
     assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener));
     assertEquals(schemaChangeListener._schemaList.size(), 1);
     assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema);
+
+    TestLogicalTableConfigChangeListener logicalTableConfigChangeListener = 
new TestLogicalTableConfigChangeListener();
+    
assertTrue(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener));
+    
assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.size(), 
1);
+    
assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.get(0), 
logicalTableConfig);
+
     // Re-register the change listener should fail
     
assertFalse(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
     assertFalse(tableCache.registerSchemaChangeListener(schemaChangeListener));
+    
assertFalse(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener));
 
     // Update the schema
     schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
@@ -174,8 +208,34 @@ public class TableCacheTest {
     // waitForEVToDisappear() call
     TEST_INSTANCE.waitForEVToAppear(OFFLINE_TABLE_NAME);
 
+    // Update logical table config (create schema and table config for 
anotherTable)
+    Schema anotherTableSchema =
+        new 
Schema.SchemaBuilder().setSchemaName(ANOTHER_TABLE).addSingleValueDimension("testColumn",
 DataType.INT)
+            .build();
+    TableConfig anotherTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(ANOTHER_TABLE_OFFLINE).build();
+    TEST_INSTANCE.getHelixResourceManager().addSchema(anotherTableSchema, 
false, false);
+    TEST_INSTANCE.getHelixResourceManager().addTable(anotherTableConfig);
+    TEST_INSTANCE.waitForEVToAppear(ANOTHER_TABLE_OFFLINE);
+    // Wait for at most 10 seconds for the callback to add the table config to 
the cache
+    TestUtils.waitForCondition(
+        aVoid -> 
anotherTableConfig.equals(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE)), 
10_000L,
+        "Failed to add the table config to the cache");
+    // update the logical table
+    logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, 
List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE));
+    
TEST_INSTANCE.getHelixResourceManager().updateLogicalTable(logicalTableConfig);
+    if (isCaseInsensitive) {
+      
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
+      assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), 
expectedSchema);
+    } else {
+      
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
+    }
+    assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
+    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+
     // Remove the table config
     TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
+    
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(ANOTHER_TABLE_OFFLINE);
     // Wait for at most 10 seconds for the callback to remove the table config 
from the cache
     // NOTE:
     // - Verify if the callback is fully done by checking the table config 
change lister because it is the last step of
@@ -183,27 +243,55 @@ public class TableCacheTest {
     TestUtils.waitForCondition(aVoid -> 
tableConfigChangeListener._tableConfigList.isEmpty(), 10_000L,
         "Failed to remove the table config from the cache");
     assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+    assertNull(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE));
     assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
 
     // Remove the schema
     TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME);
+    TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE);
     // Wait for at most 10 seconds for the callback to remove the schema from 
the cache
     // NOTE:
     // - Verify if the callback is fully done by checking the schema change 
lister because it is the last step of the
     //   callback handling
     TestUtils.waitForCondition(aVoid -> 
schemaChangeListener._schemaList.isEmpty(), 10_000L,
         "Failed to remove the schema from the cache");
+
+    // Remove logical table
+    
TEST_INSTANCE.getHelixResourceManager().deleteLogicalTable(LOGICAL_TABLE_NAME);
+    // Wait for at most 10 seconds for the callback to remove the logical 
table from the cache
+    // NOTE:
+    // - Verify if the callback is fully done by checking the logical table 
change lister because it is the last step of
+    //   the callback handling
+    TestUtils.waitForCondition(aVoid -> 
logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L,
+        "Failed to remove the logical table from the cache");
+
     assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+    assertNull(tableCache.getSchema(ANOTHER_TABLE));
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
     assertNull(tableCache.getSchema(RAW_TABLE_NAME));
     assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
+    assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME));
     assertEquals(schemaChangeListener._schemaList.size(), 0);
     assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
+    
assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.size(), 
0);
 
     // Wait for external view to disappear to ensure a clean start for the 
next test
     TEST_INSTANCE.waitForEVToDisappear(OFFLINE_TABLE_NAME);
+    TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE);
+  }
+
+  private static LogicalTableConfig getLogicalTableConfig(String tableName, 
List<String> physicalTableNames) {
+    Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+    for (String physicalTableName : physicalTableNames) {
+      physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+    }
+    LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+        .setTableName(tableName)
+        .setBrokerTenant("DefaultTenant")
+        .setPhysicalTableConfigMap(physicalTableConfigMap);
+    return builder.build();
   }
 
   @DataProvider(name = "testTableCacheDataProvider")
@@ -229,6 +317,15 @@ public class TableCacheTest {
     }
   }
 
+  private static class TestLogicalTableConfigChangeListener implements 
LogicalTableConfigChangeListener {
+    private volatile List<LogicalTableConfig> _logicalTableConfigList;
+
+    @Override
+    public void onChange(List<LogicalTableConfig> logicalTableConfigList) {
+      _logicalTableConfigList = logicalTableConfigList;
+    }
+  }
+
   @AfterClass
   public void tearDown() {
     TEST_INSTANCE.cleanup();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java
new file mode 100644
index 0000000000..adf4b990db
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.provider;
+
+import java.util.List;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+
+
+public interface LogicalTableConfigChangeListener {
+  /**
+   * The callback to be invoked on logical table changes
+   * @param logicalTableConfigList the entire list of logical tables in the 
cluster
+   */
+  void onChange(List<LogicalTableConfig> logicalTableConfigList);
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
index 0400fe0498..64de5ada56 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.config.provider;
 
 import java.util.List;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -57,4 +58,19 @@ public interface PinotConfigProvider {
    *         registered.
    */
   boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener);
+
+  /**
+   * Returns the logical table config for the given logical table name.
+   * @param logicalTableName the name of the logical table
+   * @return the logical table
+   */
+  LogicalTableConfig getLogicalTableConfig(String logicalTableName);
+
+  /**
+   * Registers the {@link LogicalTableConfigChangeListener} and notifies it 
whenever any changes (addition, update,
+   * @param logicalTableConfigChangeListener the listener to be registered
+   * @return {@code true} if the listener is successfully registered, {@code 
false} if the listener is already
+   *         registered.
+   */
+  boolean 
registerLogicalTableConfigChangeListener(LogicalTableConfigChangeListener 
logicalTableConfigChangeListener);
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
new file mode 100644
index 0000000000..4d477691d1
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LogicalTableConfig extends BaseJsonConfig {
+
+  private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
+
+  public static final String LOGICAL_TABLE_NAME_KEY = "tableName";
+  public static final String PHYSICAL_TABLE_CONFIG_KEY = 
"physicalTableConfigMap";
+  public static final String BROKER_TENANT_KEY = "brokerTenant";
+
+  private String _tableName;
+  private String _brokerTenant;
+  private Map<String, PhysicalTableConfig> _physicalTableConfigMap;
+
+  public static LogicalTableConfig fromString(String logicalTableString)
+      throws IOException {
+    return JsonUtils.stringToObject(logicalTableString, 
LogicalTableConfig.class);
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  public Map<String, PhysicalTableConfig> getPhysicalTableConfigMap() {
+    return _physicalTableConfigMap;
+  }
+
+  public void setPhysicalTableConfigMap(
+      Map<String, PhysicalTableConfig> physicalTableConfigMap) {
+    _physicalTableConfigMap = physicalTableConfigMap;
+  }
+
+  public String getBrokerTenant() {
+    return _brokerTenant;
+  }
+
+  public void setBrokerTenant(String brokerTenant) {
+    _brokerTenant = brokerTenant;
+  }
+
+  private JsonNode toJsonObject() {
+    return DEFAULT_MAPPER.valueToTree(this);
+  }
+
+  /**
+   * Returns a single-line json string representation of the schema.
+   */
+  public String toSingleLineJsonString() {
+    return toJsonObject().toString();
+  }
+
+  /**
+   * Returns a pretty json string representation of the schema.
+   */
+  public String toPrettyJsonString() {
+    try {
+      return JsonUtils.objectToPrettyString(toJsonObject());
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (this == object) {
+      return true;
+    }
+    if (object == null || getClass() != object.getClass()) {
+      return false;
+    }
+    LogicalTableConfig that = (LogicalTableConfig) object;
+    return Objects.equals(getTableName(), that.getTableName());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getTableName());
+  }
+
+  @Override
+  public String toString() {
+    return toSingleLineJsonString();
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
new file mode 100644
index 0000000000..c86fcf97dc
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * This class represents the configuration for a physical table in {@link 
LogicalTableConfig}.
+ * This is empty by design and more docs would be added as features are added.
+ */
+public class PhysicalTableConfig extends BaseJsonConfig {
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 2804eac53e..eb1b7d3e17 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -633,4 +633,24 @@ public class ControllerRequestURLBuilder {
   public String forIdealState(String tableName) {
     return StringUtil.join("/", _baseUrl, "tables", tableName, "idealstate");
   }
+
+  public String forLogicalTableCreate() {
+    return StringUtil.join("/", _baseUrl, "logicalTables");
+  }
+
+  public String forLogicalTableUpdate(String logicalTableName) {
+    return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
+  }
+
+  public String forLogicalTableGet(String logicalTableName) {
+    return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
+  }
+
+  public String forLogicalTableNamesGet() {
+    return StringUtil.join("/", _baseUrl, "logicalTables");
+  }
+
+  public String forLogicalTableDelete(String logicalTableName) {
+    return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
new file mode 100644
index 0000000000..eff47c5af6
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.builder;
+
+import java.util.Map;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+
+
+public class LogicalTableConfigBuilder {
+  private String _tableName;
+  private Map<String, PhysicalTableConfig> _physicalTableConfigMap;
+  private String _brokerTenant;
+
+  public LogicalTableConfigBuilder setTableName(String tableName) {
+    _tableName = tableName;
+    return this;
+  }
+
+  public LogicalTableConfigBuilder setPhysicalTableConfigMap(Map<String, 
PhysicalTableConfig> physicalTableConfigMap) {
+    _physicalTableConfigMap = physicalTableConfigMap;
+    return this;
+  }
+
+  public LogicalTableConfigBuilder setBrokerTenant(String brokerTenant) {
+    _brokerTenant = brokerTenant;
+    return this;
+  }
+
+  public LogicalTableConfig build() {
+    LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+    logicalTableConfig.setTableName(_tableName);
+    logicalTableConfig.setPhysicalTableConfigMap(_physicalTableConfigMap);
+    logicalTableConfig.setBrokerTenant(_brokerTenant);
+    return logicalTableConfig;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to