This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4b77f8ba0d Broker selection for logical tables. (#15726) 4b77f8ba0d is described below commit 4b77f8ba0db2ec82267ee73e3a71061561772880 Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Thu May 8 13:58:45 2025 +0530 Broker selection for logical tables. (#15726) --- .../pinot/broker/routing/BrokerRoutingManager.java | 6 + .../pinot/common/metadata/ZKMetadataProvider.java | 4 + .../api/resources/PinotLogicalTableResource.java | 6 +- .../helix/core/PinotHelixResourceManager.java | 170 ++++++++++++++------- .../pinot/controller/helix/ControllerTest.java | 2 +- .../pinot/controller/helix/TableCacheTest.java | 30 ++-- 6 files changed, 146 insertions(+), 72 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 49c9739d63..360d05e5e7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -417,6 +417,12 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle * Builds/rebuilds the routing for the given table. */ public synchronized void buildRouting(String tableNameWithType) { + // skip route building for logical tables + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableNameWithType)) { + LOGGER.info("Skipping route building for logical table: {}", tableNameWithType); + return; + } + LOGGER.info("Building routing for table: {}", tableNameWithType); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 2df97d55a8..04a31f4bf4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -865,4 +865,8 @@ public class ZKMetadataProvider { return null; } } + + public static boolean isLogicalTableExists(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableName) { + return propertyStore.exists(constructPropertyStorePathForLogical(tableName), AccessOption.PERSISTENT); + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java index 2e65ab9378..844d9aea24 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java @@ -190,7 +190,7 @@ public class PinotLogicalTableResource { @ApiParam(value = "Logical table name", required = true) @PathParam("tableName") String tableName, @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); - if (_pinotHelixResourceManager.deleteLogicalTable(tableName)) { + if (_pinotHelixResourceManager.deleteLogicalTableConfig(tableName)) { return new SuccessResponse(tableName + " logical table successfully deleted."); } else { throw new ControllerApplicationException(LOGGER, "Failed to delete logical table", @@ -223,7 +223,7 @@ public class PinotLogicalTableResource { _pinotHelixResourceManager.getAllTables(), _pinotHelixResourceManager.getAllBrokerTenantNames() ); - _pinotHelixResourceManager.addLogicalTable(logicalTableConfig); + _pinotHelixResourceManager.addLogicalTableConfig(logicalTableConfig); return new SuccessResponse(tableName + " logical table successfully added."); } catch (TableAlreadyExistsException e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.CONFLICT, e); @@ -247,7 +247,7 @@ public class PinotLogicalTableResource { _pinotHelixResourceManager.getAllTables(), _pinotHelixResourceManager.getAllBrokerTenantNames() ); - _pinotHelixResourceManager.updateLogicalTable(logicalTableConfig); + _pinotHelixResourceManager.updateLogicalTableConfig(logicalTableConfig); return new SuccessResponse(logicalTableConfig.getTableName() + " logical table successfully updated."); } catch (TableNotFoundException e) { throw new ControllerApplicationException(LOGGER, "Failed to find logical table " + tableName, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 9bdfb8a254..8ff276c20b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -534,18 +534,30 @@ public class PinotHelixResourceManager { } public List<InstanceConfig> getBrokerInstancesConfigsFor(String tableName) { - String brokerTenantName = null; + String brokerTenantName = getBrokerTenantName(tableName); + return HelixHelper.getInstancesConfigsWithTag(HelixHelper.getInstanceConfigs(_helixZkManager), + TagNameUtils.getBrokerTagForTenant(brokerTenantName)); + } + + @Nullable + private String getBrokerTenantName(String tableName) { TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName); if (offlineTableConfig != null) { - brokerTenantName = offlineTableConfig.getTenantConfig().getBroker(); - } else { - TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, tableName); - if (realtimeTableConfig != null) { - brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker(); - } + return offlineTableConfig.getTenantConfig().getBroker(); } - return HelixHelper.getInstancesConfigsWithTag(HelixHelper.getInstanceConfigs(_helixZkManager), - TagNameUtils.getBrokerTagForTenant(brokerTenantName)); + + TableConfig realtimeTableConfig = ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, tableName); + if (realtimeTableConfig != null) { + return realtimeTableConfig.getTenantConfig().getBroker(); + } + + // If the table is not found, check if it is a logical table + LogicalTableConfig logicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); + if (logicalTableConfig != null) { + return logicalTableConfig.getBrokerTenant(); + } + + return null; } public List<String> getAllBrokerInstances() { @@ -1803,6 +1815,36 @@ public class PinotHelixResourceManager { LOGGER.info("Adding table {}: Successfully added table", tableNameWithType); } + /** + * Adds a logical table. + * @param logicalTableConfig The logical table config to be added + * @throws TableAlreadyExistsException If the logical table already exists + */ + public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws TableAlreadyExistsException { + String tableName = logicalTableConfig.getTableName(); + LOGGER.info("Adding logical table {}: Start", tableName); + + // Check if the logical table name is already used + if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { + throw new TableAlreadyExistsException("Logical table: " + tableName + " already exists"); + } + + // Check if the table name is already used by a physical table + getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals) + .findFirst().ifPresent(tableNameWithType -> { + throw new TableAlreadyExistsException("Table name: " + tableName + " already exists"); + }); + + LOGGER.info("Adding logical table {}: Creating logical table config in the property store", tableName); + ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); + + LOGGER.info("Adding logical table {}: Updating BrokerResource for table", tableName); + updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); + + LOGGER.info("Added logical table {}: Successfully added table", tableName); + } + /** * Validates the tenant config for the table. In case of a single tenant cluster, * if the server and broker tenants are not specified in the config, they're @@ -2050,6 +2092,40 @@ public class PinotHelixResourceManager { setExistingTableConfig(tableConfig, -1); } + /** + * Update the logical table config. + * @param logicalTableConfig The logical table config to be updated + * @throws TableNotFoundException If the logical table does not exist + */ + public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig) + throws TableNotFoundException { + String tableName = logicalTableConfig.getTableName(); + LOGGER.info("Updating logical table {}: Start", tableName); + + if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { + throw new TableNotFoundException("Logical table: " + tableName + " does not exist"); + } + + LOGGER.info("Updating logical table {}: Updating logical table config in the property store", tableName); + ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); + + LOGGER.info("Updating logical table {}: Updating BrokerResource for table", tableName); + updateBrokerResourceForLogicalTable(logicalTableConfig, tableName); + + LOGGER.info("Updated logical table {}: Successfully updated table", tableName); + } + + private void updateBrokerResourceForLogicalTable(LogicalTableConfig logicalTableConfig, String tableName) { + List<String> brokers = HelixHelper.getInstancesWithTag( + _helixZkManager, TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant())); + HelixHelper.updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, is -> { + assert is != null; + is.getRecord().getMapFields() + .put(tableName, SegmentAssignmentUtils.getInstanceStateMap(brokers, BrokerResourceStateModel.ONLINE)); + return is; + }); + } + /** * Sets the given table config into zookeeper with the expected version, which is the previous tableConfig znRecord * version. If the expected version is -1, the version check is ignored. @@ -2170,6 +2246,33 @@ public class PinotHelixResourceManager { LOGGER.info("Deleting table {}: Finish", tableNameWithType); } + /** + * Deletes the logical table. + * @param tableName The logical table name + * @return True if the logical table was deleted, false otherwise + */ + public boolean deleteLogicalTableConfig(String tableName) { + LOGGER.info("Deleting logical table {}: Start", tableName); + if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) { + throw new ControllerApplicationException(LOGGER, + "Logical table: " + tableName + " does not exists.", Response.Status.NOT_FOUND); + } + + LOGGER.info("Deleting logical table {}: Removing BrokerResource for logical table", tableName); + HelixHelper.updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, is -> { + assert is != null; + is.getRecord().getMapFields().remove(tableName); + return is; + }); + + LOGGER.info("Deleting logical table {}: Removing logical table config from the property store", tableName); + String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForLogical(tableName); + boolean result = _propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT); + + LOGGER.info("Deleted logical table {}: Successfully deleted table", tableName); + return result; + } + /** * Toggles the state (ONLINE|OFFLINE|DROP) of the given table. */ @@ -2206,55 +2309,6 @@ public class PinotHelixResourceManager { } } - public void addLogicalTable(LogicalTableConfig logicalTableConfig) - throws TableAlreadyExistsException { - String tableName = logicalTableConfig.getTableName(); - LOGGER.info("Adding logical table: {}", tableName); - - // Check if the logical table name is already used - LogicalTableConfig existingLogicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); - if (existingLogicalTableConfig != null) { - throw new TableAlreadyExistsException("Logical table: " + tableName + " already exists"); - } - - // Check if the table name is already used by a physical table - getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals) - .findFirst().ifPresent(tableNameWithType -> { - throw new TableAlreadyExistsException("Table name: " + tableName + " already exists"); - }); - - ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); - LOGGER.info("Added logical table: {}", tableName); - } - - public void updateLogicalTable(LogicalTableConfig logicalTableConfig) - throws TableNotFoundException { - String tableName = logicalTableConfig.getTableName(); - LOGGER.info("Updating logical table: {}", tableName); - - LogicalTableConfig oldLogicalTableConfig = ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); - if (oldLogicalTableConfig == null) { - throw new TableNotFoundException("Logical table: " + tableName + " does not exist"); - } - - ZKMetadataProvider.setLogicalTableConfig(_propertyStore, logicalTableConfig); - LOGGER.info("Updated logical table: {}", tableName); - } - - public boolean deleteLogicalTable(String tableName) { - LOGGER.info("Deleting logical table: {}", tableName); - boolean result = false; - String propertyStorePath = ZKMetadataProvider.constructPropertyStorePathForLogical(tableName); - if (_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) { - result = _propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT); - } else { - throw new ControllerApplicationException(LOGGER, - "Logical table: " + tableName + " does not exists.", Response.Status.NOT_FOUND); - } - LOGGER.info("Deleted logical table: {}", tableName); - return result; - } - public LogicalTableConfig getLogicalTableConfig(String tableName) { return ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index effcf823b7..119c774fde 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -1233,7 +1233,7 @@ public class ControllerTest { // Delete logical tables List<String> logicalTables = _helixResourceManager.getAllLogicalTableNames(); for (String logicalTableName : logicalTables) { - _helixResourceManager.deleteLogicalTable(logicalTableName); + _helixResourceManager.deleteLogicalTableConfig(logicalTableName); } // Delete all tables diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java index dd5e217417..9690fd6df2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener; import org.apache.pinot.spi.config.provider.SchemaChangeListener; @@ -84,11 +85,7 @@ public class TableCacheTest { TestUtils.waitForCondition(aVoid -> tableCache.getSchema(RAW_TABLE_NAME) != null, 10_000L, "Failed to add the schema to the cache"); // Schema can be accessed by the schema name, but not by the table name because table config is not added yet - Schema expectedSchema = - new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn", DataType.INT) - .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT) - .addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME, DataType.STRING) - .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, DataType.STRING).build(); + Schema expectedSchema = getExpectedSchema(RAW_TABLE_NAME); Map<String, String> expectedColumnMap = new HashMap<>(); expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn", "testColumn"); expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId"); @@ -118,7 +115,7 @@ public class TableCacheTest { // Add logical table LogicalTableConfig logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME)); - TEST_INSTANCE.getHelixResourceManager().addLogicalTable(logicalTableConfig); + TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig); // Wait for at most 10 seconds for the callback to add the logical table to the cache TestUtils.waitForCondition(aVoid -> tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L, "Failed to add the logical table to the cache"); @@ -223,15 +220,21 @@ public class TableCacheTest { "Failed to add the table config to the cache"); // update the logical table logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE)); - TEST_INSTANCE.getHelixResourceManager().updateLogicalTable(logicalTableConfig); + TEST_INSTANCE.getHelixResourceManager().updateLogicalTableConfig(logicalTableConfig); + // Wait for at most 10 seconds for the callback to update the logical table in the cache + TestUtils.waitForCondition( + aVoid -> Objects.requireNonNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME)) + .getPhysicalTableConfigMap().size() == 2, 10_000L, + "Failed to update the logical table in the cache"); + if (isCaseInsensitive) { assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), logicalTableConfig); - assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), expectedSchema); + assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), getExpectedSchema(ANOTHER_TABLE)); } else { assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME)); } assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), logicalTableConfig); - assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema); + assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), getExpectedSchema(ANOTHER_TABLE)); // Remove the table config TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME); @@ -259,7 +262,7 @@ public class TableCacheTest { "Failed to remove the schema from the cache"); // Remove logical table - TEST_INSTANCE.getHelixResourceManager().deleteLogicalTable(LOGICAL_TABLE_NAME); + TEST_INSTANCE.getHelixResourceManager().deleteLogicalTableConfig(LOGICAL_TABLE_NAME); // Wait for at most 10 seconds for the callback to remove the logical table from the cache // NOTE: // - Verify if the callback is fully done by checking the logical table change lister because it is the last step of @@ -282,6 +285,13 @@ public class TableCacheTest { TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE); } + private static Schema getExpectedSchema(String tableName) { + return new Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn", DataType.INT) + .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT) + .addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME, DataType.STRING) + .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, DataType.STRING).build(); + } + private static LogicalTableConfig getLogicalTableConfig(String tableName, List<String> physicalTableNames) { Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>(); for (String physicalTableName : physicalTableNames) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org