This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 980b5d04b3 Logical table schema enforcement. (#15733) 980b5d04b3 is described below commit 980b5d04b3219c933fc18f13d3a9ed8c2fb63cdb Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Fri May 9 11:35:05 2025 +0530 Logical table schema enforcement. (#15733) --- .../pinot/common/config/provider/TableCache.java | 19 ------- .../pinot/common/metadata/ZKMetadataProvider.java | 11 ++++ .../pinot/common/utils/LogicalTableUtils.java | 24 ++++++--- .../PinotHelixPropertyStoreZnRecordProvider.java | 6 ++- .../api/resources/PinotLogicalTableResource.java | 26 ++-------- .../helix/core/PinotHelixResourceManager.java | 35 ++++++++++++- .../resources/PinotLogicalTableResourceTest.java | 47 ++++++++++++----- ...inotUserWithAccessLogicalTableResourceTest.java | 2 + .../pinot/controller/helix/ControllerTest.java | 2 +- .../pinot/controller/helix/TableCacheTest.java | 60 +++++++++++++--------- 10 files changed, 142 insertions(+), 90 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index 726df76930..0b12a64282 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -279,28 +278,10 @@ public class TableCache implements PinotConfigProvider { @Nullable @Override public Schema getSchema(String rawTableName) { - if (_schemaInfoMap.containsKey(rawTableName)) { - return getPhysicalTableSchema(rawTableName); - } else { - return getLogicalTableSchema(rawTableName); - } - } - - private Schema getPhysicalTableSchema(String rawTableName) { SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName); return schemaInfo != null ? schemaInfo._schema : null; } - @Nullable - private Schema getLogicalTableSchema(String logicalTableName) { - LogicalTableConfig logicalTableConfig = getLogicalTableConfig(logicalTableName); - if (logicalTableConfig == null) { - return null; - } - Optional<String> physicalTableName = logicalTableConfig.getPhysicalTableConfigMap().keySet().stream().findFirst(); - return getPhysicalTableSchema(TableNameBuilder.extractRawTableName(physicalTableName.orElse(null))); - } - @Override public boolean registerSchemaChangeListener(SchemaChangeListener schemaChangeListener) { synchronized (_zkSchemaChangeListener) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 04a31f4bf4..e1f21ceea4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -642,6 +642,17 @@ public class ZKMetadataProvider { } } + /** + * Check if the schema exists in the property store. + * + * @param propertyStore Helix property store + * @param schemaName Schema name + * @return true if the schema exists, false otherwise + */ + public static boolean isSchemaExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String schemaName) { + return propertyStore.exists(constructPropertyStorePathForSchema(schemaName), AccessOption.PERSISTENT); + } + /** * Get the schema associated with the given table name. * diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java index 0b5d715bd9..be92ffdeb0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java @@ -21,11 +21,12 @@ package org.apache.pinot.common.utils; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.utils.JsonUtils; @@ -73,8 +74,11 @@ public class LogicalTableUtils { return record; } - public static void validateLogicalTableName(LogicalTableConfig logicalTableConfig, List<String> allPhysicalTables, - Set<String> allBrokerTenantNames) { + public static void validateLogicalTableName( + LogicalTableConfig logicalTableConfig, + Predicate<String> physicalTableExistsPredicate, + Predicate<String> brokerTenantExistsPredicate, + ZkHelixPropertyStore<ZNRecord> propertyStore) { String tableName = logicalTableConfig.getTableName(); if (StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException("Invalid logical table name. Reason: 'tableName' should not be null or empty"); @@ -96,7 +100,7 @@ public class LogicalTableUtils { PhysicalTableConfig physicalTableConfig = entry.getValue(); // validate physical table exists - if (!allPhysicalTables.contains(physicalTableName)) { + if (!physicalTableExistsPredicate.test(physicalTableName)) { throw new IllegalArgumentException( "Invalid logical table. Reason: '" + physicalTableName + "' should be one of the existing tables"); } @@ -108,11 +112,17 @@ public class LogicalTableUtils { } } - // validate broker tenant + // validate broker tenant exists String brokerTenant = logicalTableConfig.getBrokerTenant(); - if (!allBrokerTenantNames.contains(brokerTenant)) { + if (!brokerTenantExistsPredicate.test(brokerTenant)) { throw new IllegalArgumentException( "Invalid logical table. Reason: '" + brokerTenant + "' should be one of the existing broker tenants"); } + + // Validate schema with same name as logical table exists + if (!ZKMetadataProvider.isSchemaExists(propertyStore, tableName)) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: Schema with same name as logical table '" + tableName + "' does not exist"); + } } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java index 7ff2d6da2c..fc25ef4dd3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java @@ -44,13 +44,17 @@ public class PinotHelixPropertyStoreZnRecordProvider { } public static PinotHelixPropertyStoreZnRecordProvider forTable(ZkHelixPropertyStore<ZNRecord> propertyStore) { - return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, "/CONFIGS/TABLES"); + return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, "/CONFIGS/TABLE"); } public static PinotHelixPropertyStoreZnRecordProvider forSegments(ZkHelixPropertyStore<ZNRecord> propertyStore) { return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, "/SEGMENTS"); } + public static PinotHelixPropertyStoreZnRecordProvider forLogicalTable(ZkHelixPropertyStore<ZNRecord> propertyStore) { + return new PinotHelixPropertyStoreZnRecordProvider(propertyStore, "/LOGICAL/TABLE"); + } + public ZNRecord get(String name) { return _propertyStore.get(_pathPrefix + "/" + name, null, AccessOption.PERSISTENT); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java index 844d9aea24..cf9cf3b2d4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java @@ -43,11 +43,9 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.arrow.util.Preconditions; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.common.utils.DatabaseUtils; -import org.apache.pinot.common.utils.LogicalTableUtils; import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; @@ -82,7 +80,6 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K @Path("/") public class PinotLogicalTableResource { public static final Logger LOGGER = LoggerFactory.getLogger(PinotLogicalTableResource.class); - private static final String DEFAULT_BROKER_TENANT = "DefaultTenant"; @Inject PinotHelixResourceManager _pinotHelixResourceManager; @@ -172,7 +169,7 @@ public class PinotLogicalTableResource { tableName = DatabaseUtils.translateTableName(tableName, headers); logicalTableConfig.setTableName(tableName); - SuccessResponse successResponse = updateLogicalTable(tableName, logicalTableConfig); + SuccessResponse successResponse = updateLogicalTable(logicalTableConfig); return new ConfigSuccessResponse(successResponse.getStatus(), logicalTableConfigAndUnrecognizedProps.getRight()); } @@ -214,15 +211,6 @@ public class PinotLogicalTableResource { private SuccessResponse addLogicalTable(LogicalTableConfig logicalTableConfig) { String tableName = logicalTableConfig.getTableName(); try { - if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) { - logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT); - } - - LogicalTableUtils.validateLogicalTableName( - logicalTableConfig, - _pinotHelixResourceManager.getAllTables(), - _pinotHelixResourceManager.getAllBrokerTenantNames() - ); _pinotHelixResourceManager.addLogicalTableConfig(logicalTableConfig); return new SuccessResponse(tableName + " logical table successfully added."); } catch (TableAlreadyExistsException e) { @@ -236,17 +224,9 @@ public class PinotLogicalTableResource { } } - private SuccessResponse updateLogicalTable(String tableName, LogicalTableConfig logicalTableConfig) { + private SuccessResponse updateLogicalTable(LogicalTableConfig logicalTableConfig) { + String tableName = logicalTableConfig.getTableName(); try { - if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) { - logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT); - } - - LogicalTableUtils.validateLogicalTableName( - logicalTableConfig, - _pinotHelixResourceManager.getAllTables(), - _pinotHelixResourceManager.getAllBrokerTenantNames() - ); _pinotHelixResourceManager.updateLogicalTableConfig(logicalTableConfig); return new SuccessResponse(logicalTableConfig.getTableName() + " logical table successfully updated."); } catch (TableNotFoundException e) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 8ff276c20b..ad62a01227 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -127,6 +127,7 @@ import org.apache.pinot.common.utils.BcryptUtils; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.LogicalTableUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; @@ -1649,6 +1650,12 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig); } + public List<String> getAllSchemaNames() { + return _propertyStore.getChildNames( + PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), AccessOption.PERSISTENT + ); + } + public List<String> getSchemaNames() { return getSchemaNames(null); } @@ -1825,6 +1832,17 @@ public class PinotHelixResourceManager { String tableName = logicalTableConfig.getTableName(); LOGGER.info("Adding logical table {}: Start", tableName); + if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) { + logicalTableConfig.setBrokerTenant("DefaultTenant"); + } + + LogicalTableUtils.validateLogicalTableName( + logicalTableConfig, + PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist, + getAllBrokerTenantNames()::contains, + _propertyStore + ); + // Check if the logical table name is already used if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { throw new TableAlreadyExistsException("Logical table: " + tableName + " already exists"); @@ -2102,6 +2120,17 @@ public class PinotHelixResourceManager { String tableName = logicalTableConfig.getTableName(); LOGGER.info("Updating logical table {}: Start", tableName); + if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) { + logicalTableConfig.setBrokerTenant("DefaultTenant"); + } + + LogicalTableUtils.validateLogicalTableName( + logicalTableConfig, + PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist, + getAllBrokerTenantNames()::contains, + _propertyStore + ); + if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { throw new TableNotFoundException("Logical table: " + tableName + " does not exist"); } @@ -2314,8 +2343,10 @@ public class PinotHelixResourceManager { } public List<String> getAllLogicalTableNames() { - return ZKMetadataProvider.getAllLogicalTableConfigs(_propertyStore).stream().map(LogicalTableConfig::getTableName) - .collect(Collectors.toList()); + List<String> logicalTableNames = _propertyStore.getChildNames( + PinotHelixPropertyStoreZnRecordProvider.forLogicalTable(_propertyStore).getRelativePath(), + AccessOption.PERSISTENT); + return logicalTableNames != null ? logicalTableNames : Collections.emptyList(); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java index 1f2d3a8204..d6df95e5cb 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java @@ -84,6 +84,7 @@ public class PinotLogicalTableResourceTest extends ControllerTest { public void testCreateUpdateDeleteLogicalTables(String logicalTableName, List<String> physicalTableNames, List<String> physicalTablesToUpdate) throws IOException { + addDummySchema(logicalTableName); // verify logical table does not exist String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(logicalTableName); @@ -199,15 +200,46 @@ public class PinotLogicalTableResourceTest extends ControllerTest { } } + @Test + public void testLogicalTableSchemaValidation() + throws IOException { + String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_3")); + + // Test logical table schema does not exist + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Reason: Schema with same name as logical table '" + LOGICAL_TABLE_NAME + + "' does not exist"), e.getMessage()); + } + + // Test logical table with db prefix but schema without db prefix + addDummySchema(LOGICAL_TABLE_NAME); + logicalTableConfig = getDummyLogicalTableConfig("db." + LOGICAL_TABLE_NAME, physicalTableNamesWithType, + BROKER_TENANT); + try { + ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + fail("Logical Table POST request should have failed"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Reason: Schema with same name as logical table 'db." + LOGICAL_TABLE_NAME + + "' does not exist"), e.getMessage()); + } + } + @Test public void testLogicalTableWithSameNameNotAllowed() throws IOException { String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME); - List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_1", "test_table_2")); + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_2")); LogicalTableConfig logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + addDummySchema(LOGICAL_TABLE_NAME); ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); try { @@ -217,11 +249,6 @@ public class PinotLogicalTableResourceTest extends ControllerTest { } catch (IOException e) { assertTrue(e.getMessage().contains("Logical table: test_logical_table already exists"), e.getMessage()); } - - // clean up the logical table - String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME); - ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); - verifyLogicalTableDoesNotExists(getLogicalTableUrl); } @DataProvider @@ -270,6 +297,7 @@ public class PinotLogicalTableResourceTest extends ControllerTest { List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); for (int i = 0; i < logicalTableNames.size(); i++) { + addDummySchema(logicalTableNames.get(i)); LogicalTableConfig logicalTableConfig = getDummyLogicalTableConfig(logicalTableNames.get(i), List.of( physicalTableNamesWithType.get(2 * i), physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT); @@ -280,13 +308,6 @@ public class PinotLogicalTableResourceTest extends ControllerTest { // verify logical table names String getLogicalTableNamesResponse = ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders()); assertEquals(getLogicalTableNamesResponse, objectMapper.writeValueAsString(logicalTableNames)); - - // cleanup: delete logical tables - for (String logicalTableName : logicalTableNames) { - String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName); - String deleteResponse = ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders()); - assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + " logical table successfully deleted.\"}"); - } } private void verifyLogicalTableExists(String getLogicalTableUrl, LogicalTableConfig logicalTableConfig) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java index b8f0473267..62632f1a6a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java @@ -75,6 +75,8 @@ public class PinotUserWithAccessLogicalTableResourceTest extends ControllerTest addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); addFakeServerInstancesToAutoJoinHelixCluster(1, true); _controllerRequestURLBuilder = getControllerRequestURLBuilder(); + // create schema for logical table + addDummySchema(LOGICAL_TABLE_NAME); } @AfterMethod diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 119c774fde..6f7c06260a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -1254,7 +1254,7 @@ public class ControllerTest { }, 60_000L, "Failed to clean up all the external views"); // Delete all schemas. - List<String> schemaNames = _helixResourceManager.getSchemaNames(); + List<String> schemaNames = _helixResourceManager.getAllSchemaNames(); if (CollectionUtils.isNotEmpty(schemaNames)) { for (String schemaName : schemaNames) { getHelixResourceManager().deleteSchema(schemaName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java index 9690fd6df2..a10b2f0aea 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.exception.SchemaAlreadyExistsException; +import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException; import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener; import org.apache.pinot.spi.config.provider.SchemaChangeListener; import org.apache.pinot.spi.config.provider.TableConfigChangeListener; @@ -77,20 +79,10 @@ public class TableCacheTest { assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME)); // Add a schema - Schema schema = - new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn", DataType.INT) - .build(); - TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false); - // Wait for at most 10 seconds for the callback to add the schema to the cache - TestUtils.waitForCondition(aVoid -> tableCache.getSchema(RAW_TABLE_NAME) != null, 10_000L, - "Failed to add the schema to the cache"); + Schema schema = addSchema(RAW_TABLE_NAME, tableCache); // Schema can be accessed by the schema name, but not by the table name because table config is not added yet Schema expectedSchema = getExpectedSchema(RAW_TABLE_NAME); - Map<String, String> expectedColumnMap = new HashMap<>(); - expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn", "testColumn"); - expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId"); - expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName", "$hostName"); - expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName", "$segmentName"); + Map<String, String> expectedColumnMap = getExpectedColumnMap(isCaseInsensitive); assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Case-insensitive table name are handled based on the table config instead of the schema @@ -114,6 +106,7 @@ public class TableCacheTest { assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Add logical table + addSchema(LOGICAL_TABLE_NAME, tableCache); LogicalTableConfig logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME)); TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig); // Wait for at most 10 seconds for the callback to add the logical table to the cache @@ -123,12 +116,11 @@ public class TableCacheTest { if (isCaseInsensitive) { assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), LOGICAL_TABLE_NAME); assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); - assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), expectedSchema); } else { assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); } assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); - assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema); + assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), getExpectedSchema(LOGICAL_TABLE_NAME)); // Register the change listeners TestTableConfigChangeListener tableConfigChangeListener = new TestTableConfigChangeListener(); @@ -138,8 +130,9 @@ public class TableCacheTest { TestSchemaChangeListener schemaChangeListener = new TestSchemaChangeListener(); assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener)); - assertEquals(schemaChangeListener._schemaList.size(), 1); - assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema); + assertEquals(schemaChangeListener._schemaList.size(), 2); + assertTrue(schemaChangeListener._schemaList.get(0).equals(expectedSchema) + || schemaChangeListener._schemaList.get(1).equals(expectedSchema)); TestLogicalTableConfigChangeListener logicalTableConfigChangeListener = new TestLogicalTableConfigChangeListener(); assertTrue(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener)); @@ -164,8 +157,9 @@ public class TableCacheTest { expectedColumnMap.put(isCaseInsensitive ? "newcolumn" : "newColumn", "newColumn"); TestUtils.waitForCondition(aVoid -> { assertNotNull(tableCache.getSchema(RAW_TABLE_NAME)); - assertEquals(schemaChangeListener._schemaList.size(), 1); - return schemaChangeListener._schemaList.get(0).equals(expectedSchema); + assertEquals(schemaChangeListener._schemaList.size(), 2); + return schemaChangeListener._schemaList.get(0).equals(expectedSchema) + || schemaChangeListener._schemaList.get(1).equals(expectedSchema); }, 10_000L, "Failed to update the schema in the cache"); // Schema can be accessed by both the schema name and the raw table name assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); @@ -206,12 +200,9 @@ public class TableCacheTest { TEST_INSTANCE.waitForEVToAppear(OFFLINE_TABLE_NAME); // Update logical table config (create schema and table config for anotherTable) - Schema anotherTableSchema = - new Schema.SchemaBuilder().setSchemaName(ANOTHER_TABLE).addSingleValueDimension("testColumn", DataType.INT) - .build(); + addSchema(ANOTHER_TABLE, tableCache); TableConfig anotherTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(ANOTHER_TABLE_OFFLINE).build(); - TEST_INSTANCE.getHelixResourceManager().addSchema(anotherTableSchema, false, false); TEST_INSTANCE.getHelixResourceManager().addTable(anotherTableConfig); TEST_INSTANCE.waitForEVToAppear(ANOTHER_TABLE_OFFLINE); // Wait for at most 10 seconds for the callback to add the table config to the cache @@ -229,12 +220,11 @@ public class TableCacheTest { if (isCaseInsensitive) { assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); - assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), getExpectedSchema(ANOTHER_TABLE)); } else { assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); } assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); - assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), getExpectedSchema(ANOTHER_TABLE)); + assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), getExpectedSchema(LOGICAL_TABLE_NAME)); // Remove the table config TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME); @@ -254,6 +244,7 @@ public class TableCacheTest { // Remove the schema TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME); TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE); + TEST_INSTANCE.getHelixResourceManager().deleteSchema(LOGICAL_TABLE_NAME); // Wait for at most 10 seconds for the callback to remove the schema from the cache // NOTE: // - Verify if the callback is fully done by checking the schema change lister because it is the last step of the @@ -285,6 +276,27 @@ public class TableCacheTest { TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE); } + private static Schema addSchema(String rawTableName, TableCache tableCache) + throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException { + Schema schema = + new Schema.SchemaBuilder().setSchemaName(rawTableName).addSingleValueDimension("testColumn", DataType.INT) + .build(); + TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false); + // Wait for at most 10 seconds for the callback to add the schema to the cache + TestUtils.waitForCondition(aVoid -> tableCache.getSchema(rawTableName) != null, + 10_000L, "Failed to add the schema to the cache"); + return schema; + } + + private static Map<String, String> getExpectedColumnMap(boolean isCaseInsensitive) { + Map<String, String> expectedColumnMap = new HashMap<>(); + expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn", "testColumn"); + expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId"); + expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName", "$hostName"); + expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName", "$segmentName"); + return expectedColumnMap; + } + private static Schema getExpectedSchema(String tableName) { return new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn", DataType.INT) .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org