This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d9e731782af broker: per-table routing build lock and request 
gating\n\n- Add per-table locks for buildRouting to allow concurrent builds 
across tables\n- Track per-table routing build start time; ignore requests 
older than build start (gate removed per latest change)\n- Skip redundant 
builds if an earlier concurrent build was already started\n- Clean up 
locks/timestamps on removeRouting\n- Add unit test BuildRoutingDelayTest for 
delay/skip logic (#16585)
d9e731782af is described below

commit d9e731782afc62149b487c27e80de4d809673853
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Aug 14 12:24:38 2025 -0700

    broker: per-table routing build lock and request gating\n\n- Add per-table 
locks for buildRouting to allow concurrent builds across tables\n- Track 
per-table routing build start time; ignore requests older than build start 
(gate removed per latest change)\n- Skip redundant builds if an earlier 
concurrent build was already started\n- Clean up locks/timestamps on 
removeRouting\n- Add unit test BuildRoutingDelayTest for delay/skip logic 
(#16585)
---
 .../pinot/broker/routing/BrokerRoutingManager.java | 360 ++++++++++++---------
 .../broker/routing/BuildRoutingDelayTest.java      |  59 ++++
 2 files changed, 270 insertions(+), 149 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 6cb983c0e5a..c347391c503 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
@@ -109,8 +111,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   private final BrokerMetrics _brokerMetrics;
   private final Map<String, RoutingEntry> _routingEntryMap = new 
ConcurrentHashMap<>();
   private final Map<String, ServerInstance> _enabledServerInstanceMap = new 
ConcurrentHashMap<>();
-  // NOTE: _excludedServers doesn't need to be concurrent because it is only 
accessed within the synchronized block
-  private final Set<String> _excludedServers = new HashSet<>();
+  // Thread-safe set because it can be read/modified concurrently from 
instance/server change paths
+  private final Set<String> _excludedServers = ConcurrentHashMap.newKeySet();
   private final ServerRoutingStatsManager _serverRoutingStatsManager;
   private final PinotConfiguration _pinotConfig;
   private final boolean _enablePartitionMetadataManager;
@@ -122,6 +124,14 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
 
   private Set<String> _routableServers;
+  // Per-table locks to allow concurrent routing builds across different 
tables while serializing per-table operations
+  private final Map<String, Object> _routingTableBuildLocks = new 
ConcurrentHashMap<>();
+  // Per-table build start time in millis. Any request before this time should 
be ignored.
+  private final Map<String, Long> _routingTableBuildStartTimeMs = new 
ConcurrentHashMap<>();
+
+  private Object getTableLock(String tableNameWithType) {
+    return _routingTableBuildLocks.computeIfAbsent(tableNameWithType, k -> new 
Object());
+  }
 
   public BrokerRoutingManager(BrokerMetrics brokerMetrics, 
ServerRoutingStatsManager serverRoutingStatsManager,
       PinotConfiguration pinotConfig) {
@@ -144,7 +154,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   }
 
   @Override
-  public synchronized void processClusterChange(ChangeType changeType) {
+  public void processClusterChange(ChangeType changeType) {
     if (changeType == ChangeType.IDEAL_STATE || changeType == 
ChangeType.EXTERNAL_VIEW) {
       processSegmentAssignmentChange();
     } else if (changeType == ChangeType.INSTANCE_CONFIG) {
@@ -177,8 +187,8 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, 
AccessOption.PERSISTENT);
     long fetchStatsEndTimeMs = System.currentTimeMillis();
 
-    List<String> tablesToUpdate = new ArrayList<>();
-    for (int i = 0; i < numTables; i++) {
+    List<String> tablesToUpdate = new CopyOnWriteArrayList<>();
+    IntStream.range(0, numTables).parallel().forEach(i -> {
       Stat idealStateStat = idealStateStats[i];
       Stat externalViewStat = externalViewStats[i];
       if (idealStateStat != null && externalViewStat != null) {
@@ -192,15 +202,18 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
             if (idealState == null) {
               LOGGER.warn("Failed to find ideal state for table: {}, skipping 
updating routing entry",
                   tableNameWithType);
-              continue;
+              return;
             }
             ExternalView externalView = 
getExternalView(routingEntry._externalViewPath);
             if (externalView == null) {
               LOGGER.warn("Failed to find external view for table: {}, 
skipping updating routing entry",
                   tableNameWithType);
-              continue;
+              return;
+            }
+            Object tableLock = getTableLock(tableNameWithType);
+            synchronized (tableLock) {
+              routingEntry.onAssignmentChange(idealState, externalView);
             }
-            routingEntry.onAssignmentChange(idealState, externalView);
           } catch (Exception e) {
             LOGGER.error(
                 "Caught unexpected exception while updating routing entry on 
segment assignment change for table: {}",
@@ -208,7 +221,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
           }
         }
       }
-    }
+    });
     long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
 
     LOGGER.info(
@@ -314,14 +327,18 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     }
 
     // Update routing entry for all tables
-    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+    Set<String> routableServersSnapshot = _routableServers;
+    _routingEntryMap.values().parallelStream().forEach(routingEntry -> {
       try {
-        routingEntry.onInstancesChange(_routableServers, changedServers);
+        Object tableLock = getTableLock(routingEntry.getTableNameWithType());
+        synchronized (tableLock) {
+          routingEntry.onInstancesChange(routableServersSnapshot, 
changedServers);
+        }
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while updating routing entry 
on instances change for table: {}",
             routingEntry.getTableNameWithType(), e);
       }
-    }
+    });
     long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
 
     // Remove new disabled servers from _enabledServerInstanceMap after 
updating all routing entries to ensure it
@@ -361,7 +378,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   /**
    * Excludes a server from the routing.
    */
-  public synchronized void excludeServerFromRouting(String instanceId) {
+  public void excludeServerFromRouting(String instanceId) {
     LOGGER.warn("Excluding server: {} from routing", instanceId);
     if (!_excludedServers.add(instanceId)) {
       LOGGER.info("Server: {} is already excluded from routing, skipping 
updating the routing", instanceId);
@@ -378,14 +395,19 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     routableServers.remove(instanceId);
     _routableServers = routableServers;
     List<String> changedServers = Collections.singletonList(instanceId);
-    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+    Set<String> routableServersSnapshot = _routableServers;
+    _routingEntryMap.values().parallelStream().forEach(routingEntry -> {
       try {
-        routingEntry.onInstancesChange(_routableServers, changedServers);
+        Object tableLock = getTableLock(routingEntry.getTableNameWithType());
+        synchronized (tableLock) {
+          routingEntry.onInstancesChange(routableServersSnapshot, 
changedServers);
+        }
       } catch (Exception e) {
-        LOGGER.error("Caught unexpected exception while updating routing entry 
when excluding server: {} for table: {}",
+        LOGGER.error(
+            "Caught unexpected exception while updating routing entry when 
excluding server: {} for table: {}",
             instanceId, routingEntry.getTableNameWithType(), e);
       }
-    }
+    });
     LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing 
entries)", instanceId,
         System.currentTimeMillis() - startTimeMs, _routingEntryMap.size());
   }
@@ -393,7 +415,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   /**
    * Includes a previous excluded server to the routing.
    */
-  public synchronized void includeServerToRouting(String instanceId) {
+  public void includeServerToRouting(String instanceId) {
     LOGGER.info("Including server: {} to routing", instanceId);
     if (!_excludedServers.remove(instanceId)) {
       LOGGER.info("Server: {} is not previously excluded, skipping updating 
the routing", instanceId);
@@ -410,14 +432,19 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
     routableServers.add(instanceId);
     _routableServers = routableServers;
     List<String> changedServers = Collections.singletonList(instanceId);
-    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+    Set<String> routableServersSnapshot = _routableServers;
+    _routingEntryMap.values().parallelStream().forEach(routingEntry -> {
       try {
-        routingEntry.onInstancesChange(_routableServers, changedServers);
+        Object tableLock = getTableLock(routingEntry.getTableNameWithType());
+        synchronized (tableLock) {
+          routingEntry.onInstancesChange(routableServersSnapshot, 
changedServers);
+        }
       } catch (Exception e) {
-        LOGGER.error("Caught unexpected exception while updating routing entry 
when including server: {} for table: {}",
+        LOGGER.error(
+            "Caught unexpected exception while updating routing entry when 
including server: {} for table: {}",
             instanceId, routingEntry.getTableNameWithType(), e);
       }
-    }
+    });
     LOGGER.info("Included server: {} to routing in {}ms (updated {} routing 
entries)", instanceId,
         System.currentTimeMillis() - startTimeMs, _routingEntryMap.size());
   }
@@ -477,7 +504,10 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
         TimeBoundaryManager timeBoundaryManager = new 
TimeBoundaryManager(tableConfig, _propertyStore, _brokerMetrics);
         timeBoundaryManager.init(idealState, externalView, 
preSelectedOnlineSegments);
 
-        
_routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager);
+        Object tableLock = getTableLock(tableNameWithType);
+        synchronized (tableLock) {
+          
_routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager);
+        }
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while setting time boundary 
manager for table: {}", tableNameWithType,
             e);
@@ -489,132 +519,152 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
    * Builds the routing for a table.
    * @param tableNameWithType the name of the table
    */
-  public synchronized void buildRouting(String tableNameWithType) {
-    LOGGER.info("Building routing for table: {}", tableNameWithType);
-
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", tableNameWithType);
-
-    String idealStatePath = getIdealStatePath(tableNameWithType);
-    IdealState idealState = getIdealState(idealStatePath);
-    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
-    int idealStateVersion = idealState.getRecord().getVersion();
-
-    String externalViewPath = getExternalViewPath(tableNameWithType);
-    ExternalView externalView = getExternalView(externalViewPath);
-    int externalViewVersion;
-    // NOTE: External view might be null for new created tables. In such case, 
create an empty one and set the version
-    // to -1 to ensure the version does not match the next external view
-    if (externalView == null) {
-      externalView = new ExternalView(tableNameWithType);
-      externalViewVersion = -1;
-    } else {
-      externalViewVersion = externalView.getRecord().getVersion();
-    }
-
-    Set<String> onlineSegments = getOnlineSegments(idealState);
-
-    SegmentPreSelector segmentPreSelector =
-        SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
-    Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
-    SegmentSelector segmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
-    segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+  public void buildRouting(String tableNameWithType) {
+    long buildStartTimeMs = System.currentTimeMillis();
+    Object tableLock = 
_routingTableBuildLocks.computeIfAbsent(tableNameWithType, k -> new Object());
+    synchronized (tableLock) {
+      Long lastStartObj = _routingTableBuildStartTimeMs.get(tableNameWithType);
+      long lastRoutingBuildStartTimeMs = lastStartObj != null ? lastStartObj : 
Long.MIN_VALUE;
+      if (buildStartTimeMs <= lastRoutingBuildStartTimeMs) {
+        LOGGER.info("Skipping routing build for table: {} because the build 
routing request timestamp {} "
+                + "is earlier than the last build start time: {}",
+            tableNameWithType, buildStartTimeMs, lastRoutingBuildStartTimeMs);
+        return;
+      }
 
-    // Register segment pruners and initialize segment zk metadata fetcher.
-    List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+      // Record build start time to gate older requests
+      _routingTableBuildStartTimeMs.put(tableNameWithType, 
System.currentTimeMillis());
+      LOGGER.info("Building routing for table: {}", tableNameWithType);
+
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
+
+      String idealStatePath = getIdealStatePath(tableNameWithType);
+      IdealState idealState = getIdealState(idealStatePath);
+      Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
+      int idealStateVersion = idealState.getRecord().getVersion();
+
+      String externalViewPath = getExternalViewPath(tableNameWithType);
+      ExternalView externalView = getExternalView(externalViewPath);
+      int externalViewVersion;
+      // NOTE: External view might be null for new created tables. In such 
case, create an empty one and set the
+      // version to -1 to ensure the version does not match the next external 
view
+      if (externalView == null) {
+        externalView = new ExternalView(tableNameWithType);
+        externalViewVersion = -1;
+      } else {
+        externalViewVersion = externalView.getRecord().getVersion();
+      }
 
-    AdaptiveServerSelector adaptiveServerSelector =
-        
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
-    InstanceSelector instanceSelector =
-        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
-            adaptiveServerSelector, _pinotConfig);
-    instanceSelector.init(_routableServers, _enabledServerInstanceMap, 
idealState, externalView,
-        preSelectedOnlineSegments);
+      Set<String> onlineSegments = getOnlineSegments(idealState);
 
-    // Add time boundary manager if both offline and real-time part exist for 
a hybrid table
-    TimeBoundaryManager timeBoundaryManager = null;
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-      // Current table is offline
-      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
-      if (_routingEntryMap.containsKey(realtimeTableName)) {
-        LOGGER.info("Adding time boundary manager for table: {}", 
tableNameWithType);
-        timeBoundaryManager = new TimeBoundaryManager(tableConfig, 
_propertyStore, _brokerMetrics);
-        timeBoundaryManager.init(idealState, externalView, 
preSelectedOnlineSegments);
-      }
-    } else {
-      // Current table is real-time
-      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
-      RoutingEntry offlineTableRoutingEntry = 
_routingEntryMap.get(offlineTableName);
-      if (offlineTableRoutingEntry != null && 
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
-        LOGGER.info("Adding time boundary manager for table: {}", 
offlineTableName);
-
-        // NOTE: Add time boundary manager to the offline part before adding 
the routing for the real-time part to
-        // ensure no overlapping data getting queried
-        TableConfig offlineTableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
-        Preconditions.checkState(offlineTableConfig != null, "Failed to find 
table config for table: %s",
-            offlineTableName);
-        IdealState offlineTableIdealState = 
getIdealState(getIdealStatePath(offlineTableName));
-        Preconditions.checkState(offlineTableIdealState != null, "Failed to 
find ideal state for table: %s",
-            offlineTableName);
-        // NOTE: External view might be null for new created tables. In such 
case, create an empty one.
-        ExternalView offlineTableExternalView = 
getExternalView(getExternalViewPath(offlineTableName));
-        if (offlineTableExternalView == null) {
-          offlineTableExternalView = new ExternalView(offlineTableName);
+      SegmentPreSelector segmentPreSelector =
+          SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
+      Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
+      SegmentSelector segmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+      segmentSelector.init(idealState, externalView, 
preSelectedOnlineSegments);
+
+      // Register segment pruners and initialize segment zk metadata fetcher.
+      List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+
+      AdaptiveServerSelector adaptiveServerSelector =
+          
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
+      InstanceSelector instanceSelector =
+          InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
+              adaptiveServerSelector, _pinotConfig);
+      instanceSelector.init(_routableServers, _enabledServerInstanceMap, 
idealState, externalView,
+          preSelectedOnlineSegments);
+
+      // Add time boundary manager if both offline and real-time part exist 
for a hybrid table
+      TimeBoundaryManager timeBoundaryManager = null;
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        // Current table is offline
+        String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+        if (_routingEntryMap.containsKey(realtimeTableName)) {
+          LOGGER.info("Adding time boundary manager for table: {}", 
tableNameWithType);
+          timeBoundaryManager = new TimeBoundaryManager(tableConfig, 
_propertyStore, _brokerMetrics);
+          timeBoundaryManager.init(idealState, externalView, 
preSelectedOnlineSegments);
+        }
+      } else {
+        // Current table is real-time
+        String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+        RoutingEntry offlineTableRoutingEntry = 
_routingEntryMap.get(offlineTableName);
+        if (offlineTableRoutingEntry != null && 
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
+          LOGGER.info("Adding time boundary manager for table: {}", 
offlineTableName);
+
+          // NOTE: Add time boundary manager to the offline part before adding 
the routing for the real-time part to
+          // ensure no overlapping data getting queried
+          TableConfig offlineTableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
+          Preconditions.checkState(offlineTableConfig != null, "Failed to find 
table config for table: %s",
+              offlineTableName);
+          IdealState offlineTableIdealState = 
getIdealState(getIdealStatePath(offlineTableName));
+          Preconditions.checkState(offlineTableIdealState != null, "Failed to 
find ideal state for table: %s",
+              offlineTableName);
+          // NOTE: External view might be null for new created tables. In such 
case, create an empty one.
+          ExternalView offlineTableExternalView = 
getExternalView(getExternalViewPath(offlineTableName));
+          if (offlineTableExternalView == null) {
+            offlineTableExternalView = new ExternalView(offlineTableName);
+          }
+          Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState);
+          SegmentPreSelector offlineTableSegmentPreSelector =
+              
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, 
_propertyStore);
+          Set<String> offlineTablePreSelectedOnlineSegments =
+              
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
+          TimeBoundaryManager offlineTableTimeBoundaryManager =
+              new TimeBoundaryManager(offlineTableConfig, _propertyStore, 
_brokerMetrics);
+          offlineTableTimeBoundaryManager.init(offlineTableIdealState, 
offlineTableExternalView,
+              offlineTablePreSelectedOnlineSegments);
+          Object offlineTableLock = getTableLock(offlineTableName);
+          synchronized (offlineTableLock) {
+            
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
+          }
         }
-        Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState);
-        SegmentPreSelector offlineTableSegmentPreSelector =
-            
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, 
_propertyStore);
-        Set<String> offlineTablePreSelectedOnlineSegments =
-            
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
-        TimeBoundaryManager offlineTableTimeBoundaryManager =
-            new TimeBoundaryManager(offlineTableConfig, _propertyStore, 
_brokerMetrics);
-        offlineTableTimeBoundaryManager.init(offlineTableIdealState, 
offlineTableExternalView,
-            offlineTablePreSelectedOnlineSegments);
-        
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
       }
-    }
 
-    SegmentPartitionMetadataManager partitionMetadataManager = null;
-    // TODO: Support multiple partition columns
-    // TODO: Make partition pruner on top of the partition metadata manager to 
avoid keeping 2 copies of the metadata
-    if (_enablePartitionMetadataManager) {
-      SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-      if (segmentPartitionConfig != null) {
-        Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-        if (columnPartitionMap.size() == 1) {
-          Map.Entry<String, ColumnPartitionConfig> partitionConfig = 
columnPartitionMap.entrySet().iterator().next();
-          LOGGER.info("Enabling SegmentPartitionMetadataManager for table: {} 
on partition column: {}",
-              tableNameWithType, partitionConfig.getKey());
-          partitionMetadataManager = new 
SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
-              partitionConfig.getValue().getFunctionName(), 
partitionConfig.getValue().getNumPartitions());
-        } else {
-          LOGGER.warn("Cannot enable SegmentPartitionMetadataManager for 
table: {} with multiple partition columns: {}",
-              tableNameWithType, columnPartitionMap.keySet());
+      SegmentPartitionMetadataManager partitionMetadataManager = null;
+      // TODO: Support multiple partition columns
+      // TODO: Make partition pruner on top of the partition metadata manager 
to avoid keeping 2 copies of the metadata
+      if (_enablePartitionMetadataManager) {
+        SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+        if (segmentPartitionConfig != null) {
+          Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
+          if (columnPartitionMap.size() == 1) {
+            Map.Entry<String, ColumnPartitionConfig> partitionConfig = 
columnPartitionMap.entrySet().iterator().next();
+            LOGGER.info("Enabling SegmentPartitionMetadataManager for table: 
{} on partition column: {}",
+                tableNameWithType, partitionConfig.getKey());
+            partitionMetadataManager = new 
SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
+                partitionConfig.getValue().getFunctionName(), 
partitionConfig.getValue().getNumPartitions());
+          } else {
+            LOGGER.warn(
+                "Cannot enable SegmentPartitionMetadataManager for table: {} 
with multiple partition columns: {}",
+                tableNameWithType, columnPartitionMap.keySet());
+          }
         }
       }
-    }
 
-    QueryConfig queryConfig = tableConfig.getQueryConfig();
-    Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : 
null;
+      QueryConfig queryConfig = tableConfig.getQueryConfig();
+      Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : 
null;
 
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
-    for (SegmentZkMetadataFetchListener listener : segmentPruners) {
-      segmentZkMetadataFetcher.register(listener);
-    }
-    if (partitionMetadataManager != null) {
-      segmentZkMetadataFetcher.register(partitionMetadataManager);
-    }
-    segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
-
-    RoutingEntry routingEntry =
-        new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, 
segmentPreSelector, segmentSelector,
-            segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
-            timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
-    if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
-      LOGGER.info("Built routing for table: {}", tableNameWithType);
-    } else {
-      LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+      SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+          new SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
+      for (SegmentZkMetadataFetchListener listener : segmentPruners) {
+        segmentZkMetadataFetcher.register(listener);
+      }
+      if (partitionMetadataManager != null) {
+        segmentZkMetadataFetcher.register(partitionMetadataManager);
+      }
+      segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
+
+      RoutingEntry routingEntry =
+          new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
+              segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+              timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
+      if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
+        LOGGER.info("Built routing for table: {}", tableNameWithType);
+      } else {
+        LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+      }
     }
   }
 
@@ -637,7 +687,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   /**
    * Removes the routing for the given table.
    */
-  public synchronized void removeRouting(String tableNameWithType) {
+  public void removeRouting(String tableNameWithType) {
     LOGGER.info("Removing routing for table: {}", tableNameWithType);
     if (_routingEntryMap.remove(tableNameWithType) != null) {
       LOGGER.info("Removed routing for table: {}", tableNameWithType);
@@ -649,10 +699,16 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
             
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(tableNameWithType));
         RoutingEntry routingEntry = _routingEntryMap.get(offlineTableName);
         if (routingEntry != null) {
-          routingEntry.setTimeBoundaryManager(null);
+          Object offlineTableLock = getTableLock(offlineTableName);
+          synchronized (offlineTableLock) {
+            routingEntry.setTimeBoundaryManager(null);
+          }
           LOGGER.info("Removed time boundary manager for table: {}", 
offlineTableName);
         }
       }
+      // Clean up any per-table build lock as the routing for this table has 
been removed
+      _routingTableBuildLocks.remove(tableNameWithType);
+      _routingTableBuildStartTimeMs.remove(tableNameWithType);
     } else {
       LOGGER.warn("Routing does not exist for table: {}, skipping removing 
routing", tableNameWithType);
     }
@@ -688,7 +744,10 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
 
       RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
       if (routingEntry != null) {
-        routingEntry.setTimeBoundaryManager(null);
+        Object tableLock = getTableLock(tableNameWithType);
+        synchronized (tableLock) {
+          routingEntry.setTimeBoundaryManager(null);
+        }
         LOGGER.info("Removed time boundary manager for table: {}", 
tableNameWithType);
       } else {
         LOGGER.warn("Routing does not exist for table: {}, skipping", 
tableNameWithType);
@@ -699,14 +758,17 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   /**
    * Refreshes the metadata for the given segment (called when segment is 
getting refreshed).
    */
-  public synchronized void refreshSegment(String tableNameWithType, String 
segment) {
+  public void refreshSegment(String tableNameWithType, String segment) {
     LOGGER.info("Refreshing segment: {} for table: {}", segment, 
tableNameWithType);
-    RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
-    if (routingEntry != null) {
-      routingEntry.refreshSegment(segment);
-      LOGGER.info("Refreshed segment: {} for table: {}", segment, 
tableNameWithType);
-    } else {
-      LOGGER.warn("Routing does not exist for table: {}, skipping refreshing 
segment", tableNameWithType);
+    Object tableLock = getTableLock(tableNameWithType);
+    synchronized (tableLock) {
+      RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+      if (routingEntry != null) {
+        routingEntry.refreshSegment(segment);
+        LOGGER.info("Refreshed segment: {} for table: {}", segment, 
tableNameWithType);
+      } else {
+        LOGGER.warn("Routing does not exist for table: {}, skipping refreshing 
segment", tableNameWithType);
+      }
     }
   }
 
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/BuildRoutingDelayTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/BuildRoutingDelayTest.java
new file mode 100644
index 00000000000..f0fb052aab0
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/BuildRoutingDelayTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class BuildRoutingDelayTest {
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testBuildRoutingSkipsWhenRequestIsOlderThanLastStart()
+      throws Exception {
+    // Construct with nulls as the build should return early before using 
these fields
+    BrokerRoutingManager manager = new BrokerRoutingManager(null, null, new 
PinotConfiguration());
+
+    String tableNameWithType = "testTable_OFFLINE";
+
+    // Set a future last build start time to force skipping the current build 
call
+    long futureStart = System.currentTimeMillis() + 10_000L;
+
+    Field startTimesField = 
BrokerRoutingManager.class.getDeclaredField("_routingTableBuildStartTimeMs");
+    startTimesField.setAccessible(true);
+    Map<String, Long> startTimes = (Map<String, Long>) 
startTimesField.get(manager);
+    if (startTimes == null) {
+      startTimes = new ConcurrentHashMap<>();
+      startTimesField.set(manager, startTimes);
+    }
+    startTimes.put(tableNameWithType, futureStart);
+
+    // Should return without throwing and without attempting to build routing
+    manager.buildRouting(tableNameWithType);
+
+    // Ensure routing was not created and the last start time was not 
overwritten
+    Assert.assertFalse(manager.routingExists(tableNameWithType));
+    Assert.assertEquals(startTimes.get(tableNameWithType).longValue(), 
futureStart);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to