This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 980b5d04b3 Logical table schema enforcement. (#15733)
980b5d04b3 is described below

commit 980b5d04b3219c933fc18f13d3a9ed8c2fb63cdb
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Fri May 9 11:35:05 2025 +0530

    Logical table schema enforcement. (#15733)
---
 .../pinot/common/config/provider/TableCache.java   | 19 -------
 .../pinot/common/metadata/ZKMetadataProvider.java  | 11 ++++
 .../pinot/common/utils/LogicalTableUtils.java      | 24 ++++++---
 .../PinotHelixPropertyStoreZnRecordProvider.java   |  6 ++-
 .../api/resources/PinotLogicalTableResource.java   | 26 ++--------
 .../helix/core/PinotHelixResourceManager.java      | 35 ++++++++++++-
 .../resources/PinotLogicalTableResourceTest.java   | 47 ++++++++++++-----
 ...inotUserWithAccessLogicalTableResourceTest.java |  2 +
 .../pinot/controller/helix/ControllerTest.java     |  2 +-
 .../pinot/controller/helix/TableCacheTest.java     | 60 +++++++++++++---------
 10 files changed, 142 insertions(+), 90 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 726df76930..0b12a64282 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -279,28 +278,10 @@ public class TableCache implements PinotConfigProvider {
   @Nullable
   @Override
   public Schema getSchema(String rawTableName) {
-    if (_schemaInfoMap.containsKey(rawTableName)) {
-      return getPhysicalTableSchema(rawTableName);
-    } else {
-      return getLogicalTableSchema(rawTableName);
-    }
-  }
-
-  private Schema getPhysicalTableSchema(String rawTableName) {
     SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
     return schemaInfo != null ? schemaInfo._schema : null;
   }
 
-  @Nullable
-  private Schema getLogicalTableSchema(String logicalTableName) {
-    LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(logicalTableName);
-    if (logicalTableConfig == null) {
-      return null;
-    }
-    Optional<String> physicalTableName = 
logicalTableConfig.getPhysicalTableConfigMap().keySet().stream().findFirst();
-    return 
getPhysicalTableSchema(TableNameBuilder.extractRawTableName(physicalTableName.orElse(null)));
-  }
-
   @Override
   public boolean registerSchemaChangeListener(SchemaChangeListener 
schemaChangeListener) {
     synchronized (_zkSchemaChangeListener) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 04a31f4bf4..e1f21ceea4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -642,6 +642,17 @@ public class ZKMetadataProvider {
     }
   }
 
+  /**
+   * Check if the schema exists in the property store.
+   *
+   * @param propertyStore Helix property store
+   * @param schemaName Schema name
+   * @return true if the schema exists, false otherwise
+   */
+  public static boolean isSchemaExists(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String schemaName) {
+    return 
propertyStore.exists(constructPropertyStorePathForSchema(schemaName), 
AccessOption.PERSISTENT);
+  }
+
   /**
    * Get the schema associated with the given table name.
    *
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
index 0b5d715bd9..be92ffdeb0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
@@ -21,11 +21,12 @@ package org.apache.pinot.common.utils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.function.Predicate;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -73,8 +74,11 @@ public class LogicalTableUtils {
     return record;
   }
 
-  public static void validateLogicalTableName(LogicalTableConfig 
logicalTableConfig, List<String> allPhysicalTables,
-      Set<String> allBrokerTenantNames) {
+  public static void validateLogicalTableName(
+      LogicalTableConfig logicalTableConfig,
+      Predicate<String> physicalTableExistsPredicate,
+      Predicate<String> brokerTenantExistsPredicate,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
     String tableName = logicalTableConfig.getTableName();
     if (StringUtils.isEmpty(tableName)) {
       throw new IllegalArgumentException("Invalid logical table name. Reason: 
'tableName' should not be null or empty");
@@ -96,7 +100,7 @@ public class LogicalTableUtils {
       PhysicalTableConfig physicalTableConfig = entry.getValue();
 
       // validate physical table exists
-      if (!allPhysicalTables.contains(physicalTableName)) {
+      if (!physicalTableExistsPredicate.test(physicalTableName)) {
         throw new IllegalArgumentException(
             "Invalid logical table. Reason: '" + physicalTableName + "' should 
be one of the existing tables");
       }
@@ -108,11 +112,17 @@ public class LogicalTableUtils {
       }
     }
 
-    // validate broker tenant
+    // validate broker tenant exists
     String brokerTenant = logicalTableConfig.getBrokerTenant();
-    if (!allBrokerTenantNames.contains(brokerTenant)) {
+    if (!brokerTenantExistsPredicate.test(brokerTenant)) {
       throw new IllegalArgumentException(
           "Invalid logical table. Reason: '" + brokerTenant + "' should be one 
of the existing broker tenants");
     }
+
+    // Validate schema with same name as logical table exists
+    if (!ZKMetadataProvider.isSchemaExists(propertyStore, tableName)) {
+      throw new IllegalArgumentException(
+          "Invalid logical table. Reason: Schema with same name as logical 
table '" + tableName + "' does not exist");
+    }
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
index 7ff2d6da2c..fc25ef4dd3 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
@@ -44,13 +44,17 @@ public class PinotHelixPropertyStoreZnRecordProvider {
   }
 
   public static PinotHelixPropertyStoreZnRecordProvider 
forTable(ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, 
"/CONFIGS/TABLES");
+    return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, 
"/CONFIGS/TABLE");
   }
 
   public static PinotHelixPropertyStoreZnRecordProvider 
forSegments(ZkHelixPropertyStore<ZNRecord> propertyStore) {
     return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, 
"/SEGMENTS");
   }
 
+  public static PinotHelixPropertyStoreZnRecordProvider 
forLogicalTable(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, 
"/LOGICAL/TABLE");
+  }
+
   public ZNRecord get(String name) {
     return _propertyStore.get(_pathPrefix + "/" + name, null, 
AccessOption.PERSISTENT);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
index 844d9aea24..cf9cf3b2d4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
@@ -43,11 +43,9 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.arrow.util.Preconditions;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.utils.DatabaseUtils;
-import org.apache.pinot.common.utils.LogicalTableUtils;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
@@ -82,7 +80,6 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 @Path("/")
 public class PinotLogicalTableResource {
   public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotLogicalTableResource.class);
-  private static final String DEFAULT_BROKER_TENANT = "DefaultTenant";
 
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
@@ -172,7 +169,7 @@ public class PinotLogicalTableResource {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     logicalTableConfig.setTableName(tableName);
 
-    SuccessResponse successResponse = updateLogicalTable(tableName, 
logicalTableConfig);
+    SuccessResponse successResponse = updateLogicalTable(logicalTableConfig);
     return new ConfigSuccessResponse(successResponse.getStatus(), 
logicalTableConfigAndUnrecognizedProps.getRight());
   }
 
@@ -214,15 +211,6 @@ public class PinotLogicalTableResource {
   private SuccessResponse addLogicalTable(LogicalTableConfig 
logicalTableConfig) {
     String tableName = logicalTableConfig.getTableName();
     try {
-      if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
-        logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
-      }
-
-      LogicalTableUtils.validateLogicalTableName(
-          logicalTableConfig,
-          _pinotHelixResourceManager.getAllTables(),
-          _pinotHelixResourceManager.getAllBrokerTenantNames()
-      );
       _pinotHelixResourceManager.addLogicalTableConfig(logicalTableConfig);
       return new SuccessResponse(tableName + " logical table successfully 
added.");
     } catch (TableAlreadyExistsException e) {
@@ -236,17 +224,9 @@ public class PinotLogicalTableResource {
     }
   }
 
-  private SuccessResponse updateLogicalTable(String tableName, 
LogicalTableConfig logicalTableConfig) {
+  private SuccessResponse updateLogicalTable(LogicalTableConfig 
logicalTableConfig) {
+    String tableName = logicalTableConfig.getTableName();
     try {
-      if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
-        logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
-      }
-
-      LogicalTableUtils.validateLogicalTableName(
-          logicalTableConfig,
-          _pinotHelixResourceManager.getAllTables(),
-          _pinotHelixResourceManager.getAllBrokerTenantNames()
-      );
       _pinotHelixResourceManager.updateLogicalTableConfig(logicalTableConfig);
       return new SuccessResponse(logicalTableConfig.getTableName() + " logical 
table successfully updated.");
     } catch (TableNotFoundException e) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8ff276c20b..ad62a01227 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -127,6 +127,7 @@ import org.apache.pinot.common.utils.BcryptUtils;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.LogicalTableUtils;
 import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -1649,6 +1650,12 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
   }
 
+  public List<String> getAllSchemaNames() {
+    return _propertyStore.getChildNames(
+        
PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(),
 AccessOption.PERSISTENT
+    );
+  }
+
   public List<String> getSchemaNames() {
     return getSchemaNames(null);
   }
@@ -1825,6 +1832,17 @@ public class PinotHelixResourceManager {
     String tableName = logicalTableConfig.getTableName();
     LOGGER.info("Adding logical table {}: Start", tableName);
 
+    if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+      logicalTableConfig.setBrokerTenant("DefaultTenant");
+    }
+
+    LogicalTableUtils.validateLogicalTableName(
+        logicalTableConfig,
+        
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
+        getAllBrokerTenantNames()::contains,
+        _propertyStore
+    );
+
     // Check if the logical table name is already used
     if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
       throw new TableAlreadyExistsException("Logical table: " + tableName + " 
already exists");
@@ -2102,6 +2120,17 @@ public class PinotHelixResourceManager {
     String tableName = logicalTableConfig.getTableName();
     LOGGER.info("Updating logical table {}: Start", tableName);
 
+    if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+      logicalTableConfig.setBrokerTenant("DefaultTenant");
+    }
+
+    LogicalTableUtils.validateLogicalTableName(
+        logicalTableConfig,
+        
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
+        getAllBrokerTenantNames()::contains,
+        _propertyStore
+    );
+
     if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
       throw new TableNotFoundException("Logical table: " + tableName + " does 
not exist");
     }
@@ -2314,8 +2343,10 @@ public class PinotHelixResourceManager {
   }
 
   public List<String> getAllLogicalTableNames() {
-    return 
ZKMetadataProvider.getAllLogicalTableConfigs(_propertyStore).stream().map(LogicalTableConfig::getTableName)
-        .collect(Collectors.toList());
+    List<String> logicalTableNames = _propertyStore.getChildNames(
+        
PinotHelixPropertyStoreZnRecordProvider.forLogicalTable(_propertyStore).getRelativePath(),
+        AccessOption.PERSISTENT);
+    return logicalTableNames != null ? logicalTableNames : 
Collections.emptyList();
   }
 
   /**
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index 1f2d3a8204..d6df95e5cb 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -84,6 +84,7 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
   public void testCreateUpdateDeleteLogicalTables(String logicalTableName, 
List<String> physicalTableNames,
       List<String> physicalTablesToUpdate)
       throws IOException {
+    addDummySchema(logicalTableName);
     // verify logical table does not exist
     String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
     String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
@@ -199,15 +200,46 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     }
   }
 
+  @Test
+  public void testLogicalTableSchemaValidation()
+      throws IOException {
+    String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_3"));
+
+    // Test logical table schema does not exist
+    LogicalTableConfig logicalTableConfig =
+        getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Reason: Schema with same name as 
logical table '" + LOGICAL_TABLE_NAME
+          + "' does not exist"), e.getMessage());
+    }
+
+    // Test logical table with db prefix but schema without db prefix
+    addDummySchema(LOGICAL_TABLE_NAME);
+    logicalTableConfig = getDummyLogicalTableConfig("db." + 
LOGICAL_TABLE_NAME, physicalTableNamesWithType,
+        BROKER_TENANT);
+    try {
+      ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+      fail("Logical Table POST request should have failed");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Reason: Schema with same name as 
logical table 'db." + LOGICAL_TABLE_NAME
+          + "' does not exist"), e.getMessage());
+    }
+  }
+
   @Test
   public void testLogicalTableWithSameNameNotAllowed()
       throws IOException {
     String addLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableCreate();
     String getLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
-    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_1", "test_table_2"));
+    List<String> physicalTableNamesWithType = 
createHybridTables(List.of("test_table_2"));
 
     LogicalTableConfig
         logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, 
physicalTableNamesWithType, BROKER_TENANT);
+    addDummySchema(LOGICAL_TABLE_NAME);
     ControllerTest.sendPostRequest(addLogicalTableUrl, 
logicalTableConfig.toSingleLineJsonString(), getHeaders());
     verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
     try {
@@ -217,11 +249,6 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     } catch (IOException e) {
       assertTrue(e.getMessage().contains("Logical table: test_logical_table 
already exists"), e.getMessage());
     }
-
-    // clean up the logical table
-    String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
-    ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
-    verifyLogicalTableDoesNotExists(getLogicalTableUrl);
   }
 
   @DataProvider
@@ -270,6 +297,7 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     List<String> physicalTableNamesWithType = 
createHybridTables(physicalTableNames);
 
     for (int i = 0; i < logicalTableNames.size(); i++) {
+      addDummySchema(logicalTableNames.get(i));
       LogicalTableConfig logicalTableConfig = 
getDummyLogicalTableConfig(logicalTableNames.get(i), List.of(
           physicalTableNamesWithType.get(2 * i), 
physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT);
 
@@ -280,13 +308,6 @@ public class PinotLogicalTableResourceTest extends 
ControllerTest {
     // verify logical table names
     String getLogicalTableNamesResponse = 
ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders());
     assertEquals(getLogicalTableNamesResponse, 
objectMapper.writeValueAsString(logicalTableNames));
-
-    // cleanup: delete logical tables
-    for (String logicalTableName : logicalTableNames) {
-      String deleteLogicalTableUrl = 
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
-      String deleteResponse = 
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
-      assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + " 
logical table successfully deleted.\"}");
-    }
   }
 
   private void verifyLogicalTableExists(String getLogicalTableUrl, 
LogicalTableConfig logicalTableConfig)
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
index b8f0473267..62632f1a6a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
@@ -75,6 +75,8 @@ public class PinotUserWithAccessLogicalTableResourceTest 
extends ControllerTest
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
     addFakeServerInstancesToAutoJoinHelixCluster(1, true);
     _controllerRequestURLBuilder = getControllerRequestURLBuilder();
+    // create schema for logical table
+    addDummySchema(LOGICAL_TABLE_NAME);
   }
 
   @AfterMethod
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 119c774fde..6f7c06260a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -1254,7 +1254,7 @@ public class ControllerTest {
     }, 60_000L, "Failed to clean up all the external views");
 
     // Delete all schemas.
-    List<String> schemaNames = _helixResourceManager.getSchemaNames();
+    List<String> schemaNames = _helixResourceManager.getAllSchemaNames();
     if (CollectionUtils.isNotEmpty(schemaNames)) {
       for (String schemaName : schemaNames) {
         getHelixResourceManager().deleteSchema(schemaName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 9690fd6df2..a10b2f0aea 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
+import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
 import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
 import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
@@ -77,20 +79,10 @@ public class TableCacheTest {
     assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME));
 
     // Add a schema
-    Schema schema =
-        new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn",
 DataType.INT)
-            .build();
-    TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false);
-    // Wait for at most 10 seconds for the callback to add the schema to the 
cache
-    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(RAW_TABLE_NAME) 
!= null, 10_000L,
-        "Failed to add the schema to the cache");
+    Schema schema = addSchema(RAW_TABLE_NAME, tableCache);
     // Schema can be accessed by the schema name, but not by the table name 
because table config is not added yet
     Schema expectedSchema = getExpectedSchema(RAW_TABLE_NAME);
-    Map<String, String> expectedColumnMap = new HashMap<>();
-    expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn", 
"testColumn");
-    expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
-    expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName", 
"$hostName");
-    expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName", 
"$segmentName");
+    Map<String, String> expectedColumnMap = 
getExpectedColumnMap(isCaseInsensitive);
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
     // Case-insensitive table name are handled based on the table config 
instead of the schema
@@ -114,6 +106,7 @@ public class TableCacheTest {
     assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), 
expectedColumnMap);
 
     // Add logical table
+    addSchema(LOGICAL_TABLE_NAME, tableCache);
     LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME));
     
TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig);
     // Wait for at most 10 seconds for the callback to add the logical table 
to the cache
@@ -123,12 +116,11 @@ public class TableCacheTest {
     if (isCaseInsensitive) {
       
assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), 
LOGICAL_TABLE_NAME);
       
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
-      assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), 
expectedSchema);
     } else {
       
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
     }
     assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
-    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), 
getExpectedSchema(LOGICAL_TABLE_NAME));
 
     // Register the change listeners
     TestTableConfigChangeListener tableConfigChangeListener = new 
TestTableConfigChangeListener();
@@ -138,8 +130,9 @@ public class TableCacheTest {
 
     TestSchemaChangeListener schemaChangeListener = new 
TestSchemaChangeListener();
     assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener));
-    assertEquals(schemaChangeListener._schemaList.size(), 1);
-    assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema);
+    assertEquals(schemaChangeListener._schemaList.size(), 2);
+    assertTrue(schemaChangeListener._schemaList.get(0).equals(expectedSchema)
+    || schemaChangeListener._schemaList.get(1).equals(expectedSchema));
 
     TestLogicalTableConfigChangeListener logicalTableConfigChangeListener = 
new TestLogicalTableConfigChangeListener();
     
assertTrue(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener));
@@ -164,8 +157,9 @@ public class TableCacheTest {
     expectedColumnMap.put(isCaseInsensitive ? "newcolumn" : "newColumn", 
"newColumn");
     TestUtils.waitForCondition(aVoid -> {
       assertNotNull(tableCache.getSchema(RAW_TABLE_NAME));
-      assertEquals(schemaChangeListener._schemaList.size(), 1);
-      return schemaChangeListener._schemaList.get(0).equals(expectedSchema);
+      assertEquals(schemaChangeListener._schemaList.size(), 2);
+      return schemaChangeListener._schemaList.get(0).equals(expectedSchema)
+          || schemaChangeListener._schemaList.get(1).equals(expectedSchema);
     }, 10_000L, "Failed to update the schema in the cache");
     // Schema can be accessed by both the schema name and the raw table name
     assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
@@ -206,12 +200,9 @@ public class TableCacheTest {
     TEST_INSTANCE.waitForEVToAppear(OFFLINE_TABLE_NAME);
 
     // Update logical table config (create schema and table config for 
anotherTable)
-    Schema anotherTableSchema =
-        new 
Schema.SchemaBuilder().setSchemaName(ANOTHER_TABLE).addSingleValueDimension("testColumn",
 DataType.INT)
-            .build();
+    addSchema(ANOTHER_TABLE, tableCache);
     TableConfig anotherTableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(ANOTHER_TABLE_OFFLINE).build();
-    TEST_INSTANCE.getHelixResourceManager().addSchema(anotherTableSchema, 
false, false);
     TEST_INSTANCE.getHelixResourceManager().addTable(anotherTableConfig);
     TEST_INSTANCE.waitForEVToAppear(ANOTHER_TABLE_OFFLINE);
     // Wait for at most 10 seconds for the callback to add the table config to 
the cache
@@ -229,12 +220,11 @@ public class TableCacheTest {
 
     if (isCaseInsensitive) {
       
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
-      assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), 
getExpectedSchema(ANOTHER_TABLE));
     } else {
       
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
     }
     assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
-    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), 
getExpectedSchema(ANOTHER_TABLE));
+    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), 
getExpectedSchema(LOGICAL_TABLE_NAME));
 
     // Remove the table config
     TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
@@ -254,6 +244,7 @@ public class TableCacheTest {
     // Remove the schema
     TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME);
     TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE);
+    TEST_INSTANCE.getHelixResourceManager().deleteSchema(LOGICAL_TABLE_NAME);
     // Wait for at most 10 seconds for the callback to remove the schema from 
the cache
     // NOTE:
     // - Verify if the callback is fully done by checking the schema change 
lister because it is the last step of the
@@ -285,6 +276,27 @@ public class TableCacheTest {
     TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE);
   }
 
+  private static Schema addSchema(String rawTableName, TableCache tableCache)
+      throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException 
{
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(rawTableName).addSingleValueDimension("testColumn",
 DataType.INT)
+            .build();
+    TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false);
+    // Wait for at most 10 seconds for the callback to add the schema to the 
cache
+    TestUtils.waitForCondition(aVoid -> tableCache.getSchema(rawTableName) != 
null,
+        10_000L, "Failed to add the schema to the cache");
+    return schema;
+  }
+
+  private static Map<String, String> getExpectedColumnMap(boolean 
isCaseInsensitive) {
+    Map<String, String> expectedColumnMap = new HashMap<>();
+    expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn", 
"testColumn");
+    expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
+    expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName", 
"$hostName");
+    expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName", 
"$segmentName");
+    return expectedColumnMap;
+  }
+
   private static Schema getExpectedSchema(String tableName) {
     return new 
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn",
 DataType.INT)
         .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to