This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7b12239104 Add configs to logical tables (#15720) 7b12239104 is described below commit 7b122391046f0a47c7cf69cd2595b9a9fc5cd186 Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Fri May 9 19:12:17 2025 +0530 Add configs to logical tables (#15720) --- .../pinot/broker/routing/BrokerRoutingManager.java | 10 +- .../pinot/common/config/provider/TableCache.java | 78 ++--- .../pinot/common/metadata/ZKMetadataProvider.java | 8 +- ...ableUtils.java => LogicalTableConfigUtils.java} | 81 ++++- .../helix/core/PinotHelixResourceManager.java | 46 ++- .../api/PinotTableRestletResourceTest.java | 30 ++ .../resources/PinotLogicalTableResourceTest.java | 329 +++++++++++++++------ ...inotUserWithAccessLogicalTableResourceTest.java | 1 + .../pinot/controller/helix/ControllerTest.java | 10 + .../pinot/controller/helix/TableCacheTest.java | 58 ++-- .../apache/pinot/spi/data/LogicalTableConfig.java | 70 +++-- .../utils/builder/LogicalTableConfigBuilder.java | 31 ++ 12 files changed, 545 insertions(+), 207 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 360d05e5e7..ca3d42d436 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -414,15 +414,17 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle } /** - * Builds/rebuilds the routing for the given table. + * Builds/rebuilds the routing for the physical table, for logical tables it is skipped. + * @param physicalOrLogicalTable a physical table with type or logical table name */ - public synchronized void buildRouting(String tableNameWithType) { + public synchronized void buildRouting(String physicalOrLogicalTable) { // skip route building for logical tables - if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableNameWithType)) { - LOGGER.info("Skipping route building for logical table: {}", tableNameWithType); + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, physicalOrLogicalTable)) { + LOGGER.info("Skipping route building for logical table: {}", physicalOrLogicalTable); return; } + String tableNameWithType = physicalOrLogicalTable; LOGGER.info("Building routing for table: {}", tableNameWithType); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index 0b12a64282..e25948fd61 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -38,7 +38,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.pinot.common.request.Expression; -import org.apache.pinot.common.utils.LogicalTableUtils; +import org.apache.pinot.common.utils.LogicalTableConfigUtils; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener; @@ -226,12 +226,17 @@ public class TableCache implements PinotConfigProvider { } /** - * Returns the expression override map for the given table, or {@code null} if no override is configured. + * Returns the expression override map for the given logical or physical table, or {@code null} if no override is + * configured. */ @Nullable - public Map<Expression, Expression> getExpressionOverrideMap(String tableNameWithType) { - TableConfigInfo tableConfigInfo = _tableConfigInfoMap.get(tableNameWithType); - return tableConfigInfo != null ? tableConfigInfo._expressionOverrideMap : null; + public Map<Expression, Expression> getExpressionOverrideMap(String physicalOrLogicalTableName) { + TableConfigInfo tableConfigInfo = _tableConfigInfoMap.get(physicalOrLogicalTableName); + if (tableConfigInfo != null) { + return tableConfigInfo._expressionOverrideMap; + } + LogicalTableConfigInfo logicalTableConfigInfo = _logicalTableConfigInfoMap.get(physicalOrLogicalTableName); + return logicalTableConfigInfo != null ? logicalTableConfigInfo._expressionOverrideMap : null; } /** @@ -256,7 +261,6 @@ public class TableCache implements PinotConfigProvider { @Nullable @Override public LogicalTableConfig getLogicalTableConfig(String logicalTableName) { - logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; LogicalTableConfigInfo logicalTableConfigInfo = _logicalTableConfigInfoMap.get(logicalTableName); return logicalTableConfigInfo != null ? logicalTableConfigInfo._logicalTableConfig : null; } @@ -344,14 +348,13 @@ public class TableCache implements PinotConfigProvider { private void putLogicalTableConfig(ZNRecord znRecord) throws IOException { - LogicalTableConfig logicalTableConfig = LogicalTableUtils.fromZNRecord(znRecord); + LogicalTableConfig logicalTableConfig = LogicalTableConfigUtils.fromZNRecord(znRecord); String logicalTableName = logicalTableConfig.getTableName(); + _logicalTableConfigInfoMap.put(logicalTableName, new LogicalTableConfigInfo(logicalTableConfig)); if (_ignoreCase) { _logicalTableNameMap.put(logicalTableName.toLowerCase(), logicalTableName); - _logicalTableConfigInfoMap.put(logicalTableName.toLowerCase(), new LogicalTableConfigInfo(logicalTableConfig)); } else { _logicalTableNameMap.put(logicalTableName, logicalTableName); - _logicalTableConfigInfoMap.put(logicalTableName, new LogicalTableConfigInfo(logicalTableConfig)); } } @@ -389,8 +392,8 @@ public class TableCache implements PinotConfigProvider { private void removeLogicalTableConfig(String path) { _propertyStore.unsubscribeDataChanges(path, _zkLogicalTableConfigChangeListener); String logicalTableName = path.substring(LOGICAL_TABLE_PATH_PREFIX.length()); - logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; _logicalTableConfigInfoMap.remove(logicalTableName); + logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() : logicalTableName; _logicalTableNameMap.remove(logicalTableName); } @@ -644,6 +647,31 @@ public class TableCache implements PinotConfigProvider { } } + private static Map<Expression, Expression> createExpressionOverrideMap(String physicalOrLogicalTableName, + QueryConfig queryConfig) { + Map<Expression, Expression> expressionOverrideMap = new TreeMap<>(); + if (queryConfig != null && MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) { + for (Map.Entry<String, String> entry : queryConfig.getExpressionOverrideMap().entrySet()) { + try { + Expression srcExp = CalciteSqlParser.compileToExpression(entry.getKey()); + Expression destExp = CalciteSqlParser.compileToExpression(entry.getValue()); + expressionOverrideMap.put(srcExp, destExp); + } catch (Exception e) { + LOGGER.warn("Caught exception while compiling expression override: {} -> {} for table: {}, skipping it", + entry.getKey(), entry.getValue(), physicalOrLogicalTableName); + } + } + int mapSize = expressionOverrideMap.size(); + if (mapSize == 1) { + Map.Entry<Expression, Expression> entry = expressionOverrideMap.entrySet().iterator().next(); + return Collections.singletonMap(entry.getKey(), entry.getValue()); + } else if (mapSize > 1) { + return expressionOverrideMap; + } + } + return null; + } + private static class TableConfigInfo { final TableConfig _tableConfig; final Map<Expression, Expression> _expressionOverrideMap; @@ -652,31 +680,7 @@ public class TableCache implements PinotConfigProvider { private TableConfigInfo(TableConfig tableConfig) { _tableConfig = tableConfig; - QueryConfig queryConfig = tableConfig.getQueryConfig(); - if (queryConfig != null && MapUtils.isNotEmpty(queryConfig.getExpressionOverrideMap())) { - Map<Expression, Expression> expressionOverrideMap = new TreeMap<>(); - for (Map.Entry<String, String> entry : queryConfig.getExpressionOverrideMap().entrySet()) { - try { - Expression srcExp = CalciteSqlParser.compileToExpression(entry.getKey()); - Expression destExp = CalciteSqlParser.compileToExpression(entry.getValue()); - expressionOverrideMap.put(srcExp, destExp); - } catch (Exception e) { - LOGGER.warn("Caught exception while compiling expression override: {} -> {} for table: {}, skipping it", - entry.getKey(), entry.getValue(), tableConfig.getTableName()); - } - } - int mapSize = expressionOverrideMap.size(); - if (mapSize == 0) { - _expressionOverrideMap = null; - } else if (mapSize == 1) { - Map.Entry<Expression, Expression> entry = expressionOverrideMap.entrySet().iterator().next(); - _expressionOverrideMap = Collections.singletonMap(entry.getKey(), entry.getValue()); - } else { - _expressionOverrideMap = expressionOverrideMap; - } - } else { - _expressionOverrideMap = null; - } + _expressionOverrideMap = createExpressionOverrideMap(tableConfig.getTableName(), tableConfig.getQueryConfig()); _timestampIndexColumns = TimestampIndexUtils.extractColumnsWithGranularity(tableConfig); } } @@ -693,10 +697,12 @@ public class TableCache implements PinotConfigProvider { private static class LogicalTableConfigInfo { final LogicalTableConfig _logicalTableConfig; - // TODO : Add expression override map for logical table, issue #15607 + final Map<Expression, Expression> _expressionOverrideMap; private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) { _logicalTableConfig = logicalTableConfig; + _expressionOverrideMap = createExpressionOverrideMap(logicalTableConfig.getTableName(), + logicalTableConfig.getQueryConfig()); } } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index e1f21ceea4..c465f11b13 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -40,7 +40,7 @@ import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.LogicalTableUtils; +import org.apache.pinot.common.utils.LogicalTableConfigUtils; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; @@ -837,7 +837,7 @@ public class ZKMetadataProvider { public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, LogicalTableConfig logicalTableConfig) { try { - ZNRecord znRecord = LogicalTableUtils.toZNRecord(logicalTableConfig); + ZNRecord znRecord = LogicalTableConfigUtils.toZNRecord(logicalTableConfig); String path = constructPropertyStorePathForLogical(logicalTableConfig.getTableName()); propertyStore.set(path, znRecord, AccessOption.PERSISTENT); } catch (JsonProcessingException e) { @@ -851,7 +851,7 @@ public class ZKMetadataProvider { if (znRecords != null) { return znRecords.stream().map(znRecord -> { try { - return LogicalTableUtils.fromZNRecord(znRecord); + return LogicalTableConfigUtils.fromZNRecord(znRecord); } catch (IOException e) { LOGGER.error("Caught exception while converting ZNRecord to LogicalTable: {}", znRecord.getId(), e); return null; @@ -870,7 +870,7 @@ public class ZKMetadataProvider { if (logicalTableZNRecord == null) { return null; } - return LogicalTableUtils.fromZNRecord(logicalTableZNRecord); + return LogicalTableConfigUtils.fromZNRecord(logicalTableZNRecord); } catch (Exception e) { LOGGER.error("Caught exception while getting logical table: {}", tableName, e); return null; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java similarity index 58% rename from pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java rename to pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java index be92ffdeb0..d9b5e69b1e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableConfigUtils.java @@ -21,12 +21,16 @@ package org.apache.pinot.common.utils; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.function.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.utils.JsonUtils; @@ -34,9 +38,9 @@ import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -public class LogicalTableUtils { +public class LogicalTableConfigUtils { - private LogicalTableUtils() { + private LogicalTableConfigUtils() { // Utility class } @@ -46,6 +50,21 @@ public class LogicalTableUtils { .setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY)) .setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY)); + if (record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY) != null) { + builder.setQueryConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY), + QueryConfig.class)); + } + if (record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY) != null) { + builder.setQuotaConfig(JsonUtils.stringToObject(record.getSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY), + QuotaConfig.class)); + } + if (record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY) != null) { + builder.setRefOfflineTableName(record.getSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY)); + } + if (record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY) != null) { + builder.setRefRealtimeTableName(record.getSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY)); + } + Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); for (Map.Entry<String, String> entry : record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY) .entrySet()) { @@ -71,10 +90,25 @@ public class LogicalTableUtils { record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY, logicalTableConfig.getTableName()); record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY, logicalTableConfig.getBrokerTenant()); record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY, physicalTableConfigMap); + + if (logicalTableConfig.getQueryConfig() != null) { + record.setSimpleField(LogicalTableConfig.QUERY_CONFIG_KEY, logicalTableConfig.getQueryConfig().toJsonString()); + } + if (logicalTableConfig.getQuotaConfig() != null) { + record.setSimpleField(LogicalTableConfig.QUOTA_CONFIG_KEY, logicalTableConfig.getQuotaConfig().toJsonString()); + } + if (logicalTableConfig.getRefOfflineTableName() != null) { + record.setSimpleField(LogicalTableConfig.REF_OFFLINE_TABLE_NAME_KEY, + logicalTableConfig.getRefOfflineTableName()); + } + if (logicalTableConfig.getRefRealtimeTableName() != null) { + record.setSimpleField(LogicalTableConfig.REF_REALTIME_TABLE_NAME_KEY, + logicalTableConfig.getRefRealtimeTableName()); + } return record; } - public static void validateLogicalTableName( + public static void validateLogicalTableConfig( LogicalTableConfig logicalTableConfig, Predicate<String> physicalTableExistsPredicate, Predicate<String> brokerTenantExistsPredicate, @@ -95,6 +129,9 @@ public class LogicalTableUtils { "Invalid logical table. Reason: 'physicalTableConfigMap' should not be null or empty"); } + Set<String> offlineTableNames = new HashSet<>(); + Set<String> realtimeTableNames = new HashSet<>(); + for (Map.Entry<String, PhysicalTableConfig> entry : logicalTableConfig.getPhysicalTableConfigMap().entrySet()) { String physicalTableName = entry.getKey(); PhysicalTableConfig physicalTableConfig = entry.getValue(); @@ -110,6 +147,44 @@ public class LogicalTableUtils { "Invalid logical table. Reason: 'physicalTableConfig' should not be null for physical table: " + physicalTableName); } + + if (TableNameBuilder.isOfflineTableResource(physicalTableName)) { + offlineTableNames.add(physicalTableName); + } else if (TableNameBuilder.isRealtimeTableResource(physicalTableName)) { + realtimeTableNames.add(physicalTableName); + } + } + + // validate ref offline table name is not null or empty when offline tables exists + if (!offlineTableNames.isEmpty() && StringUtils.isEmpty(logicalTableConfig.getRefOfflineTableName())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'refOfflineTableName' should not be null or empty when offline table exists"); + } + + // validate ref realtime table name is not null or empty when realtime tables exists + if (!realtimeTableNames.isEmpty() && StringUtils.isEmpty(logicalTableConfig.getRefRealtimeTableName())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'refRealtimeTableName' should not be null or empty when realtime table " + + "exists"); + } + + // validate ref offline table name is present in the offline tables + if (!offlineTableNames.isEmpty() && !offlineTableNames.contains(logicalTableConfig.getRefOfflineTableName())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'refOfflineTableName' should be one of the provided offline tables"); + } + + // validate ref realtime table name is present in the realtime tables + if (!realtimeTableNames.isEmpty() && !realtimeTableNames.contains(logicalTableConfig.getRefRealtimeTableName())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'refRealtimeTableName' should be one of the provided realtime tables"); + } + + // validate quota.storage is not set + QuotaConfig quotaConfig = logicalTableConfig.getQuotaConfig(); + if (quotaConfig != null && !StringUtils.isEmpty(quotaConfig.getStorage())) { + throw new IllegalArgumentException( + "Invalid logical table. Reason: 'quota.storage' should not be set for logical table"); } // validate broker tenant exists diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ad62a01227..de7b6bba80 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -127,7 +127,7 @@ import org.apache.pinot.common.utils.BcryptUtils; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.LogicalTableUtils; +import org.apache.pinot.common.utils.LogicalTableConfigUtils; import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; @@ -541,19 +541,22 @@ public class PinotHelixResourceManager { } @Nullable - private String getBrokerTenantName(String tableName) { - TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName); + private String getBrokerTenantName(String physicalOrLogicalTableName) { + TableConfig offlineTableConfig = + ZKMetadataProvider.getOfflineTableConfig(_propertyStore, physicalOrLogicalTableName); if (offlineTableConfig != null) { return offlineTableConfig.getTenantConfig().getBroker(); } - TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, tableName); + TableConfig realtimeTableConfig = + ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, physicalOrLogicalTableName); if (realtimeTableConfig != null) { return realtimeTableConfig.getTenantConfig().getBroker(); } // If the table is not found, check if it is a logical table - LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + LogicalTableConfig logicalTableConfig = + ZKMetadataProvider.getLogicalTableConfig(_propertyStore, physicalOrLogicalTableName); if (logicalTableConfig != null) { return logicalTableConfig.getBrokerTenant(); } @@ -1752,6 +1755,12 @@ public class PinotHelixResourceManager { + " already exists. If this is unexpected, try deleting the table to remove all metadata associated" + " with it."); } + + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, rawTableName)) { + throw new TableAlreadyExistsException("Logical table '" + rawTableName + + "' already exists. Please use a different name for the physical table."); + } if (_helixAdmin.getResourceExternalView(_helixClusterName, tableNameWithType) != null) { throw new TableAlreadyExistsException("External view for " + tableNameWithType + " still exists. If the table is just deleted, please wait for the clean up to finish before recreating it. " @@ -1772,7 +1781,7 @@ public class PinotHelixResourceManager { _enableBatchMessageMode); TableType tableType = tableConfig.getTableType(); // Ensure that table is not created if schema is not present - if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) { + if (ZKMetadataProvider.getSchema(_propertyStore, rawTableName) == null) { throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType); } Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, @@ -1836,9 +1845,11 @@ public class PinotHelixResourceManager { logicalTableConfig.setBrokerTenant("DefaultTenant"); } - LogicalTableUtils.validateLogicalTableName( + PinotHelixPropertyStoreZnRecordProvider pinotHelixPropertyStoreZnRecordProvider = + PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore); + LogicalTableConfigUtils.validateLogicalTableConfig( logicalTableConfig, - PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist, + pinotHelixPropertyStoreZnRecordProvider::exist, getAllBrokerTenantNames()::contains, _propertyStore ); @@ -1849,10 +1860,10 @@ public class PinotHelixResourceManager { } // Check if the table name is already used by a physical table - getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals) - .findFirst().ifPresent(tableNameWithType -> { - throw new TableAlreadyExistsException("Table name: " + tableName + " already exists"); - }); + if (pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.OFFLINE.tableNameWithType(tableName)) + || pinotHelixPropertyStoreZnRecordProvider.exist(TableNameBuilder.REALTIME.tableNameWithType(tableName))) { + throw new TableAlreadyExistsException("Table name: " + tableName + " already exists"); + } LOGGER.info("Adding logical table {}: Creating logical table config in the property store", tableName); ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); @@ -2124,22 +2135,25 @@ public class PinotHelixResourceManager { logicalTableConfig.setBrokerTenant("DefaultTenant"); } - LogicalTableUtils.validateLogicalTableName( + LogicalTableConfigUtils.validateLogicalTableConfig( logicalTableConfig, PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist, getAllBrokerTenantNames()::contains, _propertyStore ); - if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { + LogicalTableConfig oldLogicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + if (oldLogicalTableConfig == null) { throw new TableNotFoundException("Logical table: " + tableName + " does not exist"); } LOGGER.info("Updating logical table {}: Updating logical table config in the property store", tableName); ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); - LOGGER.info("Updating logical table {}: Updating BrokerResource for table", tableName); - updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); + if (!oldLogicalTableConfig.getBrokerTenant().equals(logicalTableConfig.getBrokerTenant())) { + LOGGER.info("Updating logical table {}: Updating BrokerResource for table", tableName); + updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); + } LOGGER.info("Updated logical table {}: Successfully updated table", tableName); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java index 5777bae1bf..fc21084328 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java @@ -46,6 +46,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.JsonUtils; @@ -865,6 +866,35 @@ public class PinotTableRestletResourceTest extends ControllerTest { } } + @Test + public void testTableWithSameNameAsLogicalTableIsNotAllowed() + throws IOException { + // Create physical table + String tableName = "testTable"; + DEFAULT_INSTANCE.addDummySchema(tableName); + TableConfig offlineTableConfig = _offlineBuilder.setTableName(tableName).build(); + String creationResponse = sendPostRequest(_createTableUrl, offlineTableConfig.toJsonString()); + assertEquals(creationResponse, + "{\"unrecognizedProperties\":{},\"status\":\"Table testTable_OFFLINE successfully added\"}"); + + // create logical table with above physical table + String logicalTableName = "testTable_LOGICAL"; + DEFAULT_INSTANCE.addDummySchema(logicalTableName); + LogicalTableConfig logicalTableConfig = ControllerTest.getDummyLogicalTableConfig( + logicalTableName, List.of(offlineTableConfig.getTableName()), "DefaultTenant"); + String addLogicalTableUrl = DEFAULT_INSTANCE.getControllerRequestURLBuilder().forLogicalTableCreate(); + String logicalTableResponse = sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString()); + assertEquals(logicalTableResponse, + "{\"unrecognizedProperties\":{},\"status\":\"testTable_LOGICAL logical table successfully added.\"}"); + + // create table with same as logical table and should fail + TableConfig offlineTableConfig2 = _offlineBuilder.setTableName(logicalTableName).build(); + IOException aThrows = expectThrows(IOException.class, + () -> sendPostRequest(_createTableUrl, offlineTableConfig2.toJsonString())); + assertTrue(aThrows.getMessage().contains("Logical table '" + logicalTableName + "' already exists"), + aThrows.getMessage()); + } + /** * Updating existing REALTIME table with invalid replication factor should throw exception. */ diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java index d6df95e5cb..ced4a3f6c9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java @@ -22,7 +22,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; import org.testng.annotations.AfterClass; @@ -32,7 +39,9 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; @@ -40,20 +49,26 @@ public class PinotLogicalTableResourceTest extends ControllerTest { private static final String LOGICAL_TABLE_NAME = "test_logical_table"; public static final String BROKER_TENANT = "DefaultTenant"; + public static final String NEW_BROKER_TENANT = "NewBrokerTenant"; protected ControllerRequestURLBuilder _controllerRequestURLBuilder; + private String _addLogicalTableUrl; @BeforeClass public void setUpClass() throws Exception { startZk(); startController(); - addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + addFakeBrokerInstancesToAutoJoinHelixCluster(2, false); addFakeServerInstancesToAutoJoinHelixCluster(1, true); _controllerRequestURLBuilder = getControllerRequestURLBuilder(); + _addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + createBrokerTenant(BROKER_TENANT, 1); + createBrokerTenant(NEW_BROKER_TENANT, 1); } @AfterClass public void tearDownClass() { + stopFakeInstances(); stopController(); stopZk(); } @@ -86,7 +101,6 @@ public class PinotLogicalTableResourceTest extends ControllerTest { throws IOException { addDummySchema(logicalTableName); // verify logical table does not exist - String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(logicalTableName); String updateLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName); String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName); @@ -101,7 +115,7 @@ public class PinotLogicalTableResourceTest extends ControllerTest { // create logical table String resp = - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " logical table successfully added.\"}"); @@ -129,122 +143,211 @@ public class PinotLogicalTableResourceTest extends ControllerTest { verifyLogicalTableDoesNotExists(getLogicalTableUrl); } - @Test - public void testLogicalTableValidationTests() + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*Reason: 'quota.storage' should not be set for logical table.*") + public void testLogicalTableQuotaConfigValidation() throws IOException { - String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_1")); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + logicalTableConfig.setQuotaConfig(new QuotaConfig("10G", "999")); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + } - // create physical tables - List<String> physicalTableNames = List.of("test_table_1", "test_table_2"); - List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); + @Test + public void testLogicalTableReferenceTableValidation() + throws IOException { + final List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_7")); + + // Test ref offline table name is null validation + IOException aThrows = expectThrows( + IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + logicalTableConfig.setRefOfflineTableName(null); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), + getHeaders()); + } + ); + assertTrue(aThrows.getMessage() + .contains("Reason: 'refOfflineTableName' should not be null or empty when offline table exists"), + aThrows.getMessage()); + + // Test ref realtime table name is null validation + aThrows = expectThrows( + IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + logicalTableConfig.setRefRealtimeTableName(null); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), + getHeaders()); + } + ); + assertTrue(aThrows.getMessage() + .contains("Reason: 'refRealtimeTableName' should not be null or empty when realtime table exists"), + aThrows.getMessage()); + + // Test ref offline table is present in the offline tables validation + aThrows = expectThrows( + IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + logicalTableConfig.setRefOfflineTableName("random_table_OFFLINE"); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), + getHeaders()); + } + ); + assertTrue(aThrows.getMessage() + .contains("Reason: 'refOfflineTableName' should be one of the provided offline tables"), + aThrows.getMessage()); + + // Test ref realtime table is present in the realtime tables validation + aThrows = expectThrows( + IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + logicalTableConfig.setRefRealtimeTableName("random_table_REALTIME"); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), + getHeaders()); + } + ); + assertTrue(aThrows.getMessage() + .contains("Reason: 'refRealtimeTableName' should be one of the provided realtime tables"), + aThrows.getMessage()); + } - // Test logical table name with _OFFLINE and _REALTIME is not allowed + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*Reason: 'InvalidTenant' should be one of the existing broker tenants.*") + public void testLogicalTableBrokerTenantValidation() + throws IOException { + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_3")); LogicalTableConfig logicalTableConfig = - getDummyLogicalTableConfig("testLogicalTable_OFFLINE", physicalTableNamesWithType, BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Reason: 'tableName' should not end with _OFFLINE or _REALTIME"), - e.getMessage()); - } + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, "InvalidTenant"); - logicalTableConfig = - getDummyLogicalTableConfig("testLogicalTable_REALTIME", physicalTableNamesWithType, BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Reason: 'tableName' should not end with _OFFLINE or _REALTIME"), - e.getMessage()); - } - - // Test logical table name can not be same as existing physical table name - logicalTableConfig = - getDummyLogicalTableConfig("test_table_1", physicalTableNamesWithType, BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Table name: test_table_1 already exists"), e.getMessage()); - } + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + } + @Test + public void testLogicalTablePhysicalTableConfigValidation() { // Test empty physical table names is not allowed - logicalTableConfig = - getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("'physicalTableConfigMap' should not be null or empty"), e.getMessage()); - } + Throwable throwable = expectThrows(IOException.class, () -> { + LogicalTableConfig tableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(), BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, tableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage().contains("Reason: 'physicalTableConfigMap' should not be null or empty"), + throwable.getMessage()); // Test all table names are physical table names and none is hybrid table name - logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNames, BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Reason: 'test_table_1' should be one of the existing tables"), - e.getMessage()); - } + throwable = expectThrows(IOException.class, () -> { + List<String> physicalTableNames = List.of("test_table_1"); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNames, BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage().contains("Reason: 'test_table_1' should be one of the existing tables"), + throwable.getMessage()); + } - // Test valid broker tenant is provided - logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, "InvalidTenant"); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Reason: 'InvalidTenant' should be one of the existing broker tenants"), - e.getMessage()); - } + @Test(expectedExceptions = IOException.class, + expectedExceptionsMessageRegExp = ".*Table name: test_table already exists.*") + public void testLogicalTableNameCannotSameAsPhysicalTableNameValidation() + throws IOException { + String tableName = "test_table"; + List<String> physicalTableNamesWithType = createHybridTables(List.of(tableName)); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(tableName, physicalTableNamesWithType, BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + } + + @Test + public void testLogicalTableNameSuffixValidation() + throws IOException { + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_4")); + + // Test logical table name with _OFFLINE suffix validation + Throwable throwable = expectThrows(IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig("test_logical_table_OFFLINE", physicalTableNamesWithType, BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage().contains("Reason: 'tableName' should not end with _OFFLINE or _REALTIME"), + throwable.getMessage()); + + // Test logical table name with _REALTIME suffix validation + throwable = expectThrows(IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig("test_logical_table_REALTIME", physicalTableNamesWithType, BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage().contains("Reason: 'tableName' should not end with _OFFLINE or _REALTIME"), + throwable.getMessage()); + } + + @DataProvider + public Object[][] tableTypeProvider() { + return new Object[][]{ + {TableType.OFFLINE}, + {TableType.REALTIME} + }; + } + + @Test(dataProvider = "tableTypeProvider") + public void testCreateLogicalTable(TableType tableType) + throws IOException { + // Test logical table with only realtime table + String tableName = "test_table"; + addDummySchema(tableName); + TableConfig tableConfig = createDummyTableConfig(tableName, tableType); + addTableConfig(tableConfig); + addDummySchema(LOGICAL_TABLE_NAME); + String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(tableConfig.getTableName()), BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); } @Test public void testLogicalTableSchemaValidation() throws IOException { - String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); - List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_3")); + final List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_6")); // Test logical table schema does not exist - LogicalTableConfig logicalTableConfig = - getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Reason: Schema with same name as logical table '" + LOGICAL_TABLE_NAME - + "' does not exist"), e.getMessage()); - } + Throwable throwable = expectThrows(IOException.class, () -> { + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage().contains("Reason: Schema with same name as logical table '" + LOGICAL_TABLE_NAME + + "' does not exist"), throwable.getMessage()); // Test logical table with db prefix but schema without db prefix - addDummySchema(LOGICAL_TABLE_NAME); - logicalTableConfig = getDummyLogicalTableConfig("db." + LOGICAL_TABLE_NAME, physicalTableNamesWithType, - BROKER_TENANT); - try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); - fail("Logical Table POST request should have failed"); - } catch (IOException e) { - assertTrue(e.getMessage().contains("Reason: Schema with same name as logical table 'db." + LOGICAL_TABLE_NAME - + "' does not exist"), e.getMessage()); - } + throwable = expectThrows(IOException.class, () -> { + addDummySchema(LOGICAL_TABLE_NAME); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig("db." + LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + }); + assertTrue(throwable.getMessage() + .contains("Reason: Schema with same name as logical table 'db." + LOGICAL_TABLE_NAME + "' does not exist"), + throwable.getMessage()); } @Test public void testLogicalTableWithSameNameNotAllowed() throws IOException { - String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME); - List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_2")); + addDummySchema(LOGICAL_TABLE_NAME); + List<String> physicalTableNamesWithType = createHybridTables(List.of("test_table_5")); LogicalTableConfig logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, physicalTableNamesWithType, BROKER_TENANT); - addDummySchema(LOGICAL_TABLE_NAME); - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); try { // create the same logical table again - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); fail("Logical Table POST request should have failed"); } catch (IOException e) { assertTrue(e.getMessage().contains("Logical table: test_logical_table already exists"), e.getMessage()); @@ -265,8 +368,6 @@ public class PinotLogicalTableResourceTest extends ControllerTest { public void testPhysicalTableShouldExist(String logicalTableName, List<String> physicalTableNames, String unknownTableName) throws IOException { - String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate(); - // setup physical tables List<String> physicalTableNamesWithType = createHybridTables(physicalTableNames); physicalTableNamesWithType.add(unknownTableName); @@ -275,7 +376,7 @@ public class PinotLogicalTableResourceTest extends ControllerTest { LogicalTableConfig logicalTableConfig = getDummyLogicalTableConfig(logicalTableName, physicalTableNamesWithType, BROKER_TENANT); try { - ControllerTest.sendPostRequest(addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); fail("Logical Table POST request should have failed"); } catch (IOException e) { assertTrue(e.getMessage().contains("'" + unknownTableName + "' should be one of the existing tables"), @@ -301,8 +402,7 @@ public class PinotLogicalTableResourceTest extends ControllerTest { LogicalTableConfig logicalTableConfig = getDummyLogicalTableConfig(logicalTableNames.get(i), List.of( physicalTableNamesWithType.get(2 * i), physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT); - ControllerTest.sendPostRequest(_controllerRequestURLBuilder.forLogicalTableCreate(), - logicalTableConfig.toSingleLineJsonString(), getHeaders()); + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); } // verify logical table names @@ -310,6 +410,49 @@ public class PinotLogicalTableResourceTest extends ControllerTest { assertEquals(getLogicalTableNamesResponse, objectMapper.writeValueAsString(logicalTableNames)); } + @Test + public void testLogicalTableUpdateBrokerTenantUpdate() + throws Exception { + PinotHelixResourceManager helixResourceManager = getHelixResourceManager(); + String logicalTableName = "test_logical_table"; + String getLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableGet(logicalTableName); + String updateLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName); + + // Create a logical table + addDummySchema(logicalTableName); + List<String> physicalTables = createHybridTables(List.of("physical_table")); + LogicalTableConfig logicalTableConfig = + getDummyLogicalTableConfig(logicalTableName, physicalTables, BROKER_TENANT); + String addLogicalTableResponse = + ControllerTest.sendPostRequest(_addLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + assertEquals(addLogicalTableResponse, + "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + " logical table successfully added.\"}"); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + + // verify table broker node and broker tenant node is same + IdealState brokerIdealStates = HelixHelper.getBrokerIdealStates(helixResourceManager.getHelixAdmin(), + helixResourceManager.getHelixClusterName()); + Map<String, String> instanceStateMap = brokerIdealStates.getInstanceStateMap(logicalTableName); + Set<String> brokerForTenant = helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT); + assertEquals(brokerForTenant, instanceStateMap.keySet()); + + //verify broker tenant node sets are different + Set<String> allInstancesForBrokerTenant = helixResourceManager.getAllInstancesForBrokerTenant(NEW_BROKER_TENANT); + assertNotEquals(brokerForTenant, allInstancesForBrokerTenant); + + // update logical table with new broker tenant + logicalTableConfig.setBrokerTenant(NEW_BROKER_TENANT); + sendPutRequest(updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(), getHeaders()); + verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig); + + // verify the broker node set is updated in IS + brokerIdealStates = HelixHelper.getBrokerIdealStates(helixResourceManager.getHelixAdmin(), + helixResourceManager.getHelixClusterName()); + instanceStateMap = brokerIdealStates.getInstanceStateMap(logicalTableName); + Set<String> brokerForNewTenant = helixResourceManager.getAllInstancesForBrokerTenant(NEW_BROKER_TENANT); + assertEquals(brokerForNewTenant, instanceStateMap.keySet()); + } + private void verifyLogicalTableExists(String getLogicalTableUrl, LogicalTableConfig logicalTableConfig) throws IOException { LogicalTableConfig remoteLogicalTableConfig = diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java index 62632f1a6a..7219a75402 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java @@ -88,6 +88,7 @@ public class PinotUserWithAccessLogicalTableResourceTest extends ControllerTest } catch (Exception e) { // ignore } + stopFakeInstances(); stopController(); stopZk(); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 6f7c06260a..9f775d3201 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -70,6 +70,8 @@ import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.api.resources.TableViews; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; +import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DateTimeFieldSpec; @@ -395,9 +397,17 @@ public class ControllerTest { for (String physicalTableName : physicalTableNames) { physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig()); } + String offlineTableName = + physicalTableNames.stream().filter(TableNameBuilder::isOfflineTableResource).findFirst().orElse(null); + String realtimeTableName = + physicalTableNames.stream().filter(TableNameBuilder::isRealtimeTableResource).findFirst().orElse(null); LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder() .setTableName(tableName) .setBrokerTenant(brokerTenant) + .setRefOfflineTableName(offlineTableName) + .setRefRealtimeTableName(realtimeTableName) + .setQuotaConfig(new QuotaConfig(null, "999")) + .setQueryConfig(new QueryConfig(1L, true, false, null, 1L, 1L)) .setPhysicalTableConfigMap(physicalTableConfigMap); return builder.build(); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java index a10b2f0aea..0f21afc305 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -28,15 +28,14 @@ import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException; import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener; import org.apache.pinot.spi.config.provider.SchemaChangeListener; import org.apache.pinot.spi.config.provider.TableConfigChangeListener; +import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.LogicalTableConfig; -import org.apache.pinot.spi.data.PhysicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn; -import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; @@ -45,7 +44,11 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; public class TableCacheTest { @@ -106,8 +109,9 @@ public class TableCacheTest { assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); // Add logical table + LogicalTableConfig logicalTableConfig = + ControllerTest.getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME), "DefaultTenant"); addSchema(LOGICAL_TABLE_NAME, tableCache); - LogicalTableConfig logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME)); TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig); // Wait for at most 10 seconds for the callback to add the logical table to the cache TestUtils.waitForCondition(aVoid -> tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L, @@ -115,12 +119,12 @@ public class TableCacheTest { // Logical table can be accessed by the logical table name if (isCaseInsensitive) { assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), LOGICAL_TABLE_NAME); - assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); } else { assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); } assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), getExpectedSchema(LOGICAL_TABLE_NAME)); + assertNull(tableCache.getExpressionOverrideMap(LOGICAL_TABLE_NAME)); // Register the change listeners TestTableConfigChangeListener tableConfigChangeListener = new TestTableConfigChangeListener(); @@ -210,21 +214,25 @@ public class TableCacheTest { aVoid -> anotherTableConfig.equals(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE)), 10_000L, "Failed to add the table config to the cache"); // update the logical table - logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE)); + logicalTableConfig = ControllerTest.getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, + List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE), "DefaultTenant"); + logicalTableConfig.setQueryConfig(new QueryConfig( + 1L, false, false, Map.of("DaysSinceEpoch * 24", "NewAddedDerivedHoursSinceEpoch"), 1L, 1L + )); TEST_INSTANCE.getHelixResourceManager().updateLogicalTableConfig(logicalTableConfig); - // Wait for at most 10 seconds for the callback to update the logical table in the cache TestUtils.waitForCondition( aVoid -> Objects.requireNonNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME)) .getPhysicalTableConfigMap().size() == 2, 10_000L, - "Failed to update the logical table in the cache"); - + "Failed to update the logical table in the cache" + ); if (isCaseInsensitive) { - assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); + assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME), LOGICAL_TABLE_NAME); } else { assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); } assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), getExpectedSchema(LOGICAL_TABLE_NAME)); + assertNotNull(tableCache.getExpressionOverrideMap(LOGICAL_TABLE_NAME)); // Remove the table config TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME); @@ -241,6 +249,15 @@ public class TableCacheTest { assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema); assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME), expectedColumnMap); + // Remove logical table + TEST_INSTANCE.getHelixResourceManager().deleteLogicalTableConfig(LOGICAL_TABLE_NAME); + // Wait for at most 10 seconds for the callback to remove the logical table from the cache + // NOTE: + // - Verify if the callback is fully done by checking the logical table change lister because it is the last step of + // the callback handling + TestUtils.waitForCondition(aVoid -> logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L, + "Failed to remove the logical table from the cache"); + // Remove the schema TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME); TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE); @@ -252,15 +269,6 @@ public class TableCacheTest { TestUtils.waitForCondition(aVoid -> schemaChangeListener._schemaList.isEmpty(), 10_000L, "Failed to remove the schema from the cache"); - // Remove logical table - TEST_INSTANCE.getHelixResourceManager().deleteLogicalTableConfig(LOGICAL_TABLE_NAME); - // Wait for at most 10 seconds for the callback to remove the logical table from the cache - // NOTE: - // - Verify if the callback is fully done by checking the logical table change lister because it is the last step of - // the callback handling - TestUtils.waitForCondition(aVoid -> logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L, - "Failed to remove the logical table from the cache"); - assertNull(tableCache.getSchema(RAW_TABLE_NAME)); assertNull(tableCache.getSchema(ANOTHER_TABLE)); assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME)); @@ -304,18 +312,6 @@ public class TableCacheTest { .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, DataType.STRING).build(); } - private static LogicalTableConfig getLogicalTableConfig(String tableName, List<String> physicalTableNames) { - Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); - for (String physicalTableName : physicalTableNames) { - physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig()); - } - LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder() - .setTableName(tableName) - .setBrokerTenant("DefaultTenant") - .setPhysicalTableConfigMap(physicalTableConfigMap); - return builder.build(); - } - @DataProvider(name = "testTableCacheDataProvider") public Object[][] provideCaseInsensitiveSetting() { return new Object[][]{new Object[]{true}, new Object[]{false}}; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java index 4d477691d1..2a427ce9dc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java @@ -18,18 +18,19 @@ */ package org.apache.pinot.spi.data; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; -import java.util.Objects; +import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.utils.JsonUtils; -@JsonIgnoreProperties(ignoreUnknown = true) public class LogicalTableConfig extends BaseJsonConfig { private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper(); @@ -37,10 +38,20 @@ public class LogicalTableConfig extends BaseJsonConfig { public static final String LOGICAL_TABLE_NAME_KEY = "tableName"; public static final String PHYSICAL_TABLE_CONFIG_KEY = "physicalTableConfigMap"; public static final String BROKER_TENANT_KEY = "brokerTenant"; + public static final String QUERY_CONFIG_KEY = "query"; + public static final String QUOTA_CONFIG_KEY = "quota"; + public static final String REF_OFFLINE_TABLE_NAME_KEY = "refOfflineTableName"; + public static final String REF_REALTIME_TABLE_NAME_KEY = "refRealtimeTableName"; private String _tableName; private String _brokerTenant; private Map<String, PhysicalTableConfig> _physicalTableConfigMap; + @JsonProperty(QUERY_CONFIG_KEY) + private QueryConfig _queryConfig; + @JsonProperty(QUOTA_CONFIG_KEY) + private QuotaConfig _quotaConfig; + private String _refOfflineTableName; + private String _refRealtimeTableName; public static LogicalTableConfig fromString(String logicalTableString) throws IOException { @@ -72,6 +83,42 @@ public class LogicalTableConfig extends BaseJsonConfig { _brokerTenant = brokerTenant; } + @JsonProperty(QUERY_CONFIG_KEY) + @Nullable + public QueryConfig getQueryConfig() { + return _queryConfig; + } + + public void setQueryConfig(QueryConfig queryConfig) { + _queryConfig = queryConfig; + } + + @JsonProperty(QUOTA_CONFIG_KEY) + @Nullable + public QuotaConfig getQuotaConfig() { + return _quotaConfig; + } + + public void setQuotaConfig(QuotaConfig quotaConfig) { + _quotaConfig = quotaConfig; + } + + public String getRefOfflineTableName() { + return _refOfflineTableName; + } + + public void setRefOfflineTableName(String refOfflineTableName) { + _refOfflineTableName = refOfflineTableName; + } + + public String getRefRealtimeTableName() { + return _refRealtimeTableName; + } + + public void setRefRealtimeTableName(String refRealtimeTableName) { + _refRealtimeTableName = refRealtimeTableName; + } + private JsonNode toJsonObject() { return DEFAULT_MAPPER.valueToTree(this); } @@ -94,23 +141,6 @@ public class LogicalTableConfig extends BaseJsonConfig { } } - @Override - public boolean equals(Object object) { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - LogicalTableConfig that = (LogicalTableConfig) object; - return Objects.equals(getTableName(), that.getTableName()); - } - - @Override - public int hashCode() { - return Objects.hash(getTableName()); - } - @Override public String toString() { return toSingleLineJsonString(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java index eff47c5af6..54ef55d1f0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java @@ -19,6 +19,8 @@ package org.apache.pinot.spi.utils.builder; import java.util.Map; +import org.apache.pinot.spi.config.table.QueryConfig; +import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.PhysicalTableConfig; @@ -27,6 +29,11 @@ public class LogicalTableConfigBuilder { private String _tableName; private Map<String, PhysicalTableConfig> _physicalTableConfigMap; private String _brokerTenant; + private QueryConfig _queryConfig; + private QuotaConfig _quotaConfig; + private String _refOfflineTableName; + private String _refRealtimeTableName; + public LogicalTableConfigBuilder setTableName(String tableName) { _tableName = tableName; @@ -43,11 +50,35 @@ public class LogicalTableConfigBuilder { return this; } + public LogicalTableConfigBuilder setQueryConfig(QueryConfig queryConfig) { + _queryConfig = queryConfig; + return this; + } + + public LogicalTableConfigBuilder setQuotaConfig(QuotaConfig quotaConfig) { + _quotaConfig = quotaConfig; + return this; + } + + public LogicalTableConfigBuilder setRefOfflineTableName(String refOfflineTableName) { + _refOfflineTableName = refOfflineTableName; + return this; + } + + public LogicalTableConfigBuilder setRefRealtimeTableName(String refRealtimeTableName) { + _refRealtimeTableName = refRealtimeTableName; + return this; + } + public LogicalTableConfig build() { LogicalTableConfig logicalTableConfig = new LogicalTableConfig(); logicalTableConfig.setTableName(_tableName); logicalTableConfig.setPhysicalTableConfigMap(_physicalTableConfigMap); logicalTableConfig.setBrokerTenant(_brokerTenant); + logicalTableConfig.setQueryConfig(_queryConfig); + logicalTableConfig.setQuotaConfig(_quotaConfig); + logicalTableConfig.setRefOfflineTableName(_refOfflineTableName); + logicalTableConfig.setRefRealtimeTableName(_refRealtimeTableName); return logicalTableConfig; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org