This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b77f8ba0d Broker selection for logical tables. (#15726)
4b77f8ba0d is described below

commit 4b77f8ba0db2ec82267ee73e3a71061561772880
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Thu May 8 13:58:45 2025 +0530

    Broker selection for logical tables. (#15726)
---
 .../pinot/broker/routing/BrokerRoutingManager.java |   6 +
 .../pinot/common/metadata/ZKMetadataProvider.java  |   4 +
 .../api/resources/PinotLogicalTableResource.java   |   6 +-
 .../helix/core/PinotHelixResourceManager.java      | 170 ++++++++++++++-------
 .../pinot/controller/helix/ControllerTest.java     |   2 +-
 .../pinot/controller/helix/TableCacheTest.java     |  30 ++--
 6 files changed, 146 insertions(+), 72 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 49c9739d63..360d05e5e7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -417,6 +417,12 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
    * Builds/rebuilds the routing for the given table.
    */
   public synchronized void buildRouting(String tableNameWithType) {
+    // skip route building for logical tables
+    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, 
tableNameWithType)) {
+      LOGGER.info("Skipping route building for logical table: {}", 
tableNameWithType);
+      return;
+    }
+
     LOGGER.info("Building routing for table: {}", tableNameWithType);
 
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 2df97d55a8..04a31f4bf4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -865,4 +865,8 @@ public class ZKMetadataProvider {
       return null;
     }
   }
+
+  public static boolean isLogicalTableExists(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String tableName) {
+    return 
propertyStore.exists(constructPropertyStorePathForLogical(tableName), 
AccessOption.PERSISTENT);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
index 2e65ab9378..844d9aea24 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
@@ -190,7 +190,7 @@ public class PinotLogicalTableResource {
       @ApiParam(value = "Logical table name", required = true) 
@PathParam("tableName") String tableName,
       @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
-    if (_pinotHelixResourceManager.deleteLogicalTable(tableName)) {
+    if (_pinotHelixResourceManager.deleteLogicalTableConfig(tableName)) {
       return new SuccessResponse(tableName + " logical table successfully 
deleted.");
     } else {
       throw new ControllerApplicationException(LOGGER, "Failed to delete 
logical table",
@@ -223,7 +223,7 @@ public class PinotLogicalTableResource {
           _pinotHelixResourceManager.getAllTables(),
           _pinotHelixResourceManager.getAllBrokerTenantNames()
       );
-      _pinotHelixResourceManager.addLogicalTable(logicalTableConfig);
+      _pinotHelixResourceManager.addLogicalTableConfig(logicalTableConfig);
       return new SuccessResponse(tableName + " logical table successfully 
added.");
     } catch (TableAlreadyExistsException e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.CONFLICT, e);
@@ -247,7 +247,7 @@ public class PinotLogicalTableResource {
           _pinotHelixResourceManager.getAllTables(),
           _pinotHelixResourceManager.getAllBrokerTenantNames()
       );
-      _pinotHelixResourceManager.updateLogicalTable(logicalTableConfig);
+      _pinotHelixResourceManager.updateLogicalTableConfig(logicalTableConfig);
       return new SuccessResponse(logicalTableConfig.getTableName() + " logical 
table successfully updated.");
     } catch (TableNotFoundException e) {
       throw new ControllerApplicationException(LOGGER, "Failed to find logical 
table " + tableName,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9bdfb8a254..8ff276c20b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -534,18 +534,30 @@ public class PinotHelixResourceManager {
   }
 
   public List<InstanceConfig> getBrokerInstancesConfigsFor(String tableName) {
-    String brokerTenantName = null;
+    String brokerTenantName = getBrokerTenantName(tableName);
+    return 
HelixHelper.getInstancesConfigsWithTag(HelixHelper.getInstanceConfigs(_helixZkManager),
+        TagNameUtils.getBrokerTagForTenant(brokerTenantName));
+  }
+
+  @Nullable
+  private String getBrokerTenantName(String tableName) {
     TableConfig offlineTableConfig = 
ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName);
     if (offlineTableConfig != null) {
-      brokerTenantName = offlineTableConfig.getTenantConfig().getBroker();
-    } else {
-      TableConfig realtimeTableConfig = 
ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, tableName);
-      if (realtimeTableConfig != null) {
-        brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker();
-      }
+      return offlineTableConfig.getTenantConfig().getBroker();
     }
-    return 
HelixHelper.getInstancesConfigsWithTag(HelixHelper.getInstanceConfigs(_helixZkManager),
-        TagNameUtils.getBrokerTagForTenant(brokerTenantName));
+
+    TableConfig realtimeTableConfig = 
ZKMetadataProvider.getRealtimeTableConfig(_propertyStore, tableName);
+    if (realtimeTableConfig != null) {
+      return realtimeTableConfig.getTenantConfig().getBroker();
+    }
+
+    // If the table is not found, check if it is a logical table
+    LogicalTableConfig logicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+    if (logicalTableConfig != null) {
+      return logicalTableConfig.getBrokerTenant();
+    }
+
+    return null;
   }
 
   public List<String> getAllBrokerInstances() {
@@ -1803,6 +1815,36 @@ public class PinotHelixResourceManager {
     LOGGER.info("Adding table {}: Successfully added table", 
tableNameWithType);
   }
 
+  /**
+   * Adds a logical table.
+   * @param logicalTableConfig The logical table config to be added
+   * @throws TableAlreadyExistsException If the logical table already exists
+   */
+  public void addLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+      throws TableAlreadyExistsException {
+    String tableName = logicalTableConfig.getTableName();
+    LOGGER.info("Adding logical table {}: Start", tableName);
+
+    // Check if the logical table name is already used
+    if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
+      throw new TableAlreadyExistsException("Logical table: " + tableName + " 
already exists");
+    }
+
+    // Check if the table name is already used by a physical table
+    
getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals)
+        .findFirst().ifPresent(tableNameWithType -> {
+          throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
+        });
+
+    LOGGER.info("Adding logical table {}: Creating logical table config in the 
property store", tableName);
+    ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
+
+    LOGGER.info("Adding logical table {}: Updating BrokerResource for table", 
tableName);
+    updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
+
+    LOGGER.info("Added logical table {}: Successfully added table", tableName);
+  }
+
   /**
    * Validates the tenant config for the table. In case of a single tenant 
cluster,
    * if the server and broker tenants are not specified in the config, they're
@@ -2050,6 +2092,40 @@ public class PinotHelixResourceManager {
     setExistingTableConfig(tableConfig, -1);
   }
 
+  /**
+   * Update the logical table config.
+   * @param logicalTableConfig The logical table config to be updated
+   * @throws TableNotFoundException If the logical table does not exist
+   */
+  public void updateLogicalTableConfig(LogicalTableConfig logicalTableConfig)
+      throws TableNotFoundException {
+    String tableName = logicalTableConfig.getTableName();
+    LOGGER.info("Updating logical table {}: Start", tableName);
+
+    if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
+      throw new TableNotFoundException("Logical table: " + tableName + " does 
not exist");
+    }
+
+    LOGGER.info("Updating logical table {}: Updating logical table config in 
the property store", tableName);
+    ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
+
+    LOGGER.info("Updating logical table {}: Updating BrokerResource for 
table", tableName);
+    updateBrokerResourceForLogicalTable(logicalTableConfig, tableName);
+
+    LOGGER.info("Updated logical table {}: Successfully updated table", 
tableName);
+  }
+
+  private void updateBrokerResourceForLogicalTable(LogicalTableConfig 
logicalTableConfig, String tableName) {
+    List<String> brokers = HelixHelper.getInstancesWithTag(
+        _helixZkManager, 
TagNameUtils.getBrokerTagForTenant(logicalTableConfig.getBrokerTenant()));
+    HelixHelper.updateIdealState(_helixZkManager, 
Helix.BROKER_RESOURCE_INSTANCE, is -> {
+      assert is != null;
+      is.getRecord().getMapFields()
+          .put(tableName, SegmentAssignmentUtils.getInstanceStateMap(brokers, 
BrokerResourceStateModel.ONLINE));
+      return is;
+    });
+  }
+
   /**
    * Sets the given table config into zookeeper with the expected version, 
which is the previous tableConfig znRecord
    * version. If the expected version is -1, the version check is ignored.
@@ -2170,6 +2246,33 @@ public class PinotHelixResourceManager {
     LOGGER.info("Deleting table {}: Finish", tableNameWithType);
   }
 
+  /**
+   * Deletes the logical table.
+   * @param tableName The logical table name
+   * @return True if the logical table was deleted, false otherwise
+   */
+  public boolean deleteLogicalTableConfig(String tableName) {
+    LOGGER.info("Deleting logical table {}: Start", tableName);
+    if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
+      throw new ControllerApplicationException(LOGGER,
+          "Logical table: " + tableName + " does not exists.", 
Response.Status.NOT_FOUND);
+    }
+
+    LOGGER.info("Deleting logical table {}: Removing BrokerResource for 
logical table", tableName);
+    HelixHelper.updateIdealState(_helixZkManager, 
Helix.BROKER_RESOURCE_INSTANCE, is -> {
+      assert is != null;
+      is.getRecord().getMapFields().remove(tableName);
+      return is;
+    });
+
+    LOGGER.info("Deleting logical table {}: Removing logical table config from 
the property store", tableName);
+    String propertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForLogical(tableName);
+    boolean result = _propertyStore.remove(propertyStorePath, 
AccessOption.PERSISTENT);
+
+    LOGGER.info("Deleted logical table {}: Successfully deleted table", 
tableName);
+    return result;
+  }
+
   /**
    * Toggles the state (ONLINE|OFFLINE|DROP) of the given table.
    */
@@ -2206,55 +2309,6 @@ public class PinotHelixResourceManager {
     }
   }
 
-  public void addLogicalTable(LogicalTableConfig logicalTableConfig)
-      throws TableAlreadyExistsException {
-    String tableName = logicalTableConfig.getTableName();
-    LOGGER.info("Adding logical table: {}", tableName);
-
-    // Check if the logical table name is already used
-    LogicalTableConfig existingLogicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
-    if (existingLogicalTableConfig != null) {
-      throw new TableAlreadyExistsException("Logical table: " + tableName + " 
already exists");
-    }
-
-    // Check if the table name is already used by a physical table
-    
getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals)
-        .findFirst().ifPresent(tableNameWithType -> {
-          throw new TableAlreadyExistsException("Table name: " + tableName + " 
already exists");
-        });
-
-    ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
-    LOGGER.info("Added logical table: {}", tableName);
-  }
-
-  public void updateLogicalTable(LogicalTableConfig logicalTableConfig)
-      throws TableNotFoundException {
-    String tableName = logicalTableConfig.getTableName();
-    LOGGER.info("Updating logical table: {}", tableName);
-
-    LogicalTableConfig oldLogicalTableConfig = 
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
-    if (oldLogicalTableConfig == null) {
-      throw new TableNotFoundException("Logical table: " + tableName + " does 
not exist");
-    }
-
-    ZKMetadataProvider.setLogicalTableConfig(_propertyStore, 
logicalTableConfig);
-    LOGGER.info("Updated logical table: {}", tableName);
-  }
-
-  public boolean deleteLogicalTable(String tableName) {
-    LOGGER.info("Deleting logical table: {}", tableName);
-    boolean result = false;
-    String propertyStorePath = 
ZKMetadataProvider.constructPropertyStorePathForLogical(tableName);
-    if (_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
-      result = _propertyStore.remove(propertyStorePath, 
AccessOption.PERSISTENT);
-    } else {
-      throw new ControllerApplicationException(LOGGER,
-          "Logical table: " + tableName + " does not exists.", 
Response.Status.NOT_FOUND);
-    }
-    LOGGER.info("Deleted logical table: {}", tableName);
-    return result;
-  }
-
   public LogicalTableConfig getLogicalTableConfig(String tableName) {
     return ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index effcf823b7..119c774fde 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -1233,7 +1233,7 @@ public class ControllerTest {
     // Delete logical tables
     List<String> logicalTables = 
_helixResourceManager.getAllLogicalTableNames();
     for (String logicalTableName : logicalTables) {
-      _helixResourceManager.deleteLogicalTable(logicalTableName);
+      _helixResourceManager.deleteLogicalTableConfig(logicalTableName);
     }
 
     // Delete all tables
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index dd5e217417..9690fd6df2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
 import org.apache.pinot.spi.config.provider.SchemaChangeListener;
@@ -84,11 +85,7 @@ public class TableCacheTest {
     TestUtils.waitForCondition(aVoid -> tableCache.getSchema(RAW_TABLE_NAME) 
!= null, 10_000L,
         "Failed to add the schema to the cache");
     // Schema can be accessed by the schema name, but not by the table name 
because table config is not added yet
-    Schema expectedSchema =
-        new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn",
 DataType.INT)
-            .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT)
-            .addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME, 
DataType.STRING)
-            .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, 
DataType.STRING).build();
+    Schema expectedSchema = getExpectedSchema(RAW_TABLE_NAME);
     Map<String, String> expectedColumnMap = new HashMap<>();
     expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn", 
"testColumn");
     expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
@@ -118,7 +115,7 @@ public class TableCacheTest {
 
     // Add logical table
     LogicalTableConfig logicalTableConfig = 
getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME));
-    
TEST_INSTANCE.getHelixResourceManager().addLogicalTable(logicalTableConfig);
+    
TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig);
     // Wait for at most 10 seconds for the callback to add the logical table 
to the cache
     TestUtils.waitForCondition(aVoid -> 
tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L,
         "Failed to add the logical table to the cache");
@@ -223,15 +220,21 @@ public class TableCacheTest {
         "Failed to add the table config to the cache");
     // update the logical table
     logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME, 
List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE));
-    
TEST_INSTANCE.getHelixResourceManager().updateLogicalTable(logicalTableConfig);
+    
TEST_INSTANCE.getHelixResourceManager().updateLogicalTableConfig(logicalTableConfig);
+    // Wait for at most 10 seconds for the callback to update the logical 
table in the cache
+    TestUtils.waitForCondition(
+        aVoid -> 
Objects.requireNonNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME))
+            .getPhysicalTableConfigMap().size() == 2, 10_000L,
+        "Failed to update the logical table in the cache");
+
     if (isCaseInsensitive) {
       
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME), 
logicalTableConfig);
-      assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), 
expectedSchema);
+      assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME), 
getExpectedSchema(ANOTHER_TABLE));
     } else {
       
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
     }
     assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME), 
logicalTableConfig);
-    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+    assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), 
getExpectedSchema(ANOTHER_TABLE));
 
     // Remove the table config
     TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
@@ -259,7 +262,7 @@ public class TableCacheTest {
         "Failed to remove the schema from the cache");
 
     // Remove logical table
-    
TEST_INSTANCE.getHelixResourceManager().deleteLogicalTable(LOGICAL_TABLE_NAME);
+    
TEST_INSTANCE.getHelixResourceManager().deleteLogicalTableConfig(LOGICAL_TABLE_NAME);
     // Wait for at most 10 seconds for the callback to remove the logical 
table from the cache
     // NOTE:
     // - Verify if the callback is fully done by checking the logical table 
change lister because it is the last step of
@@ -282,6 +285,13 @@ public class TableCacheTest {
     TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE);
   }
 
+  private static Schema getExpectedSchema(String tableName) {
+    return new 
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn",
 DataType.INT)
+        .addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT)
+        .addSingleValueDimension(BuiltInVirtualColumn.HOSTNAME, 
DataType.STRING)
+        .addSingleValueDimension(BuiltInVirtualColumn.SEGMENTNAME, 
DataType.STRING).build();
+  }
+
   private static LogicalTableConfig getLogicalTableConfig(String tableName, 
List<String> physicalTableNames) {
     Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
     for (String physicalTableName : physicalTableNames) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to