somandal commented on code in PR #16791:
URL: https://github.com/apache/pinot/pull/16791#discussion_r2370331031


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -489,132 +650,201 @@ public synchronized void 
buildRoutingForLogicalTable(String logicalTableName) {
    * Builds the routing for a table.
    * @param tableNameWithType the name of the table
    */
-  public synchronized void buildRouting(String tableNameWithType) {
-    LOGGER.info("Building routing for table: {}", tableNameWithType);
-
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", tableNameWithType);
-
-    String idealStatePath = getIdealStatePath(tableNameWithType);
-    IdealState idealState = getIdealState(idealStatePath);
-    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
-    int idealStateVersion = idealState.getRecord().getVersion();
-
-    String externalViewPath = getExternalViewPath(tableNameWithType);
-    ExternalView externalView = getExternalView(externalViewPath);
-    int externalViewVersion;
-    // NOTE: External view might be null for new created tables. In such case, 
create an empty one and set the version
-    // to -1 to ensure the version does not match the next external view
-    if (externalView == null) {
-      externalView = new ExternalView(tableNameWithType);
-      externalViewVersion = -1;
-    } else {
-      externalViewVersion = externalView.getRecord().getVersion();
+  public void buildRouting(String tableNameWithType) {
+    _globalLock.readLock().lock();
+    try {
+      buildRoutingInternal(tableNameWithType);
+    } finally {
+      _globalLock.readLock().unlock();
     }
+  }
 
-    Set<String> onlineSegments = getOnlineSegments(idealState);
+  private void buildRoutingInternal(String tableNameWithType) {
+    long buildStartTimeMs = System.currentTimeMillis();
+    Object tableLock = getRoutingTableBuildLock(tableNameWithType);
+    synchronized (tableLock) {
+      long lastBuildStartTimeMs = 
getLastRoutingTableBuildStartTimeMs(tableNameWithType);
+      if (buildStartTimeMs <= lastBuildStartTimeMs) {
+        LOGGER.info("Skipping routing build for table: {} because the build 
routing request timestamp {} "
+                + "is earlier than the last build start time: {}",
+            tableNameWithType, buildStartTimeMs, lastBuildStartTimeMs);
+        return;
+      }
 
-    SegmentPreSelector segmentPreSelector =
-        SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
-    Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
-    SegmentSelector segmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
-    segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+      // Record build start time to gate older requests and to use to compare 
with the timestamp for when
+      // the global processSegmentAssignmentChange() was last called
+      _routingTableBuildStartTimeMs.put(tableNameWithType, 
System.currentTimeMillis());
 
-    // Register segment pruners and initialize segment zk metadata fetcher.
-    List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+      LOGGER.info("Building routing for table: {}", tableNameWithType);
 
-    AdaptiveServerSelector adaptiveServerSelector =
-        
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
-    InstanceSelector instanceSelector =
-        InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
-            adaptiveServerSelector, _pinotConfig);
-    instanceSelector.init(_routableServers, _enabledServerInstanceMap, 
idealState, externalView,
-        preSelectedOnlineSegments);
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
 
-    // Add time boundary manager if both offline and real-time part exist for 
a hybrid table
-    TimeBoundaryManager timeBoundaryManager = null;
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-      // Current table is offline
-      String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
-      if (_routingEntryMap.containsKey(realtimeTableName)) {
-        LOGGER.info("Adding time boundary manager for table: {}", 
tableNameWithType);
-        timeBoundaryManager = new TimeBoundaryManager(tableConfig, 
_propertyStore, _brokerMetrics);
-        timeBoundaryManager.init(idealState, externalView, 
preSelectedOnlineSegments);
+      String idealStatePath = getIdealStatePath(tableNameWithType);
+      IdealState idealState = getIdealState(idealStatePath);
+      Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", tableNameWithType);
+      int idealStateVersion = idealState.getRecord().getVersion();
+
+      String externalViewPath = getExternalViewPath(tableNameWithType);
+      ExternalView externalView = getExternalView(externalViewPath);
+      int externalViewVersion;
+      // NOTE: External view might be null for new created tables. In such 
case, create an empty one and set the
+      // version to -1 to ensure the version does not match the next external 
view
+      if (externalView == null) {
+        externalView = new ExternalView(tableNameWithType);
+        externalViewVersion = -1;
+      } else {
+        externalViewVersion = externalView.getRecord().getVersion();
       }
-    } else {
-      // Current table is real-time
-      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
-      RoutingEntry offlineTableRoutingEntry = 
_routingEntryMap.get(offlineTableName);
-      if (offlineTableRoutingEntry != null && 
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
-        LOGGER.info("Adding time boundary manager for table: {}", 
offlineTableName);
-
-        // NOTE: Add time boundary manager to the offline part before adding 
the routing for the real-time part to
-        // ensure no overlapping data getting queried
-        TableConfig offlineTableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
-        Preconditions.checkState(offlineTableConfig != null, "Failed to find 
table config for table: %s",
-            offlineTableName);
-        IdealState offlineTableIdealState = 
getIdealState(getIdealStatePath(offlineTableName));
-        Preconditions.checkState(offlineTableIdealState != null, "Failed to 
find ideal state for table: %s",
-            offlineTableName);
-        // NOTE: External view might be null for new created tables. In such 
case, create an empty one.
-        ExternalView offlineTableExternalView = 
getExternalView(getExternalViewPath(offlineTableName));
-        if (offlineTableExternalView == null) {
-          offlineTableExternalView = new ExternalView(offlineTableName);
+
+      Set<String> onlineSegments = getOnlineSegments(idealState);
+
+      SegmentPreSelector segmentPreSelector =
+          SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, 
_propertyStore);
+      Set<String> preSelectedOnlineSegments = 
segmentPreSelector.preSelect(onlineSegments);
+      SegmentSelector segmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+      segmentSelector.init(idealState, externalView, 
preSelectedOnlineSegments);
+
+      // Register segment pruners and initialize segment zk metadata fetcher.
+      List<SegmentPruner> segmentPruners = 
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+
+      AdaptiveServerSelector adaptiveServerSelector =
+          
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
+      InstanceSelector instanceSelector =
+          InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
+              adaptiveServerSelector, _pinotConfig);
+      instanceSelector.init(_routableServers, _enabledServerInstanceMap, 
idealState, externalView,
+          preSelectedOnlineSegments);
+
+      // Add time boundary manager if both offline and real-time part exist 
for a hybrid table
+      TimeBoundaryManager timeBoundaryManager = null;
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+        // Current table is offline
+        String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+        if (_routingEntryMap.containsKey(realtimeTableName)) {
+          LOGGER.info("Adding time boundary manager for table: {}", 
tableNameWithType);
+          timeBoundaryManager = new TimeBoundaryManager(tableConfig, 
_propertyStore, _brokerMetrics);
+          timeBoundaryManager.init(idealState, externalView, 
preSelectedOnlineSegments);
+        }
+      } else {
+        // Current table is real-time
+        String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+        RoutingEntry offlineTableRoutingEntry = 
_routingEntryMap.get(offlineTableName);
+        if (offlineTableRoutingEntry != null && 
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
+          LOGGER.info("Adding time boundary manager for table: {}", 
offlineTableName);
+
+          // NOTE: Add time boundary manager to the offline part before adding 
the routing for the real-time part to
+          // ensure no overlapping data getting queried
+          TableConfig offlineTableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
+          Preconditions.checkState(offlineTableConfig != null, "Failed to find 
table config for table: %s",
+              offlineTableName);
+          IdealState offlineTableIdealState = 
getIdealState(getIdealStatePath(offlineTableName));
+          Preconditions.checkState(offlineTableIdealState != null, "Failed to 
find ideal state for table: %s",
+              offlineTableName);
+          // NOTE: External view might be null for new created tables. In such 
case, create an empty one.
+          ExternalView offlineTableExternalView = 
getExternalView(getExternalViewPath(offlineTableName));
+          if (offlineTableExternalView == null) {
+            offlineTableExternalView = new ExternalView(offlineTableName);
+          }
+          Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState);
+          SegmentPreSelector offlineTableSegmentPreSelector =
+              
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, 
_propertyStore);
+          Set<String> offlineTablePreSelectedOnlineSegments =
+              
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
+          TimeBoundaryManager offlineTableTimeBoundaryManager =
+              new TimeBoundaryManager(offlineTableConfig, _propertyStore, 
_brokerMetrics);
+          offlineTableTimeBoundaryManager.init(offlineTableIdealState, 
offlineTableExternalView,
+              offlineTablePreSelectedOnlineSegments);
+          
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
         }
-        Set<String> offlineTableOnlineSegments = 
getOnlineSegments(offlineTableIdealState);
-        SegmentPreSelector offlineTableSegmentPreSelector =
-            
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, 
_propertyStore);
-        Set<String> offlineTablePreSelectedOnlineSegments =
-            
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
-        TimeBoundaryManager offlineTableTimeBoundaryManager =
-            new TimeBoundaryManager(offlineTableConfig, _propertyStore, 
_brokerMetrics);
-        offlineTableTimeBoundaryManager.init(offlineTableIdealState, 
offlineTableExternalView,
-            offlineTablePreSelectedOnlineSegments);
-        
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
       }
-    }
 
-    SegmentPartitionMetadataManager partitionMetadataManager = null;
-    // TODO: Support multiple partition columns
-    // TODO: Make partition pruner on top of the partition metadata manager to 
avoid keeping 2 copies of the metadata
-    if (_enablePartitionMetadataManager) {
-      SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-      if (segmentPartitionConfig != null) {
-        Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-        if (columnPartitionMap.size() == 1) {
-          Map.Entry<String, ColumnPartitionConfig> partitionConfig = 
columnPartitionMap.entrySet().iterator().next();
-          LOGGER.info("Enabling SegmentPartitionMetadataManager for table: {} 
on partition column: {}",
-              tableNameWithType, partitionConfig.getKey());
-          partitionMetadataManager = new 
SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
-              partitionConfig.getValue().getFunctionName(), 
partitionConfig.getValue().getNumPartitions());
-        } else {
-          LOGGER.warn("Cannot enable SegmentPartitionMetadataManager for 
table: {} with multiple partition columns: {}",
-              tableNameWithType, columnPartitionMap.keySet());
+      SegmentPartitionMetadataManager partitionMetadataManager = null;
+      // TODO: Support multiple partition columns
+      // TODO: Make partition pruner on top of the partition metadata manager 
to avoid keeping 2 copies of the
+      //       metadata
+      if (_enablePartitionMetadataManager) {
+        SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+        if (segmentPartitionConfig != null) {
+          Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
+          if (columnPartitionMap.size() == 1) {
+            Map.Entry<String, ColumnPartitionConfig> partitionConfig =
+                columnPartitionMap.entrySet().iterator().next();
+            LOGGER.info("Enabling SegmentPartitionMetadataManager for table: 
{} on partition column: {}",
+                tableNameWithType, partitionConfig.getKey());
+            partitionMetadataManager =
+                new SegmentPartitionMetadataManager(tableNameWithType, 
partitionConfig.getKey(),
+                    partitionConfig.getValue().getFunctionName(), 
partitionConfig.getValue().getNumPartitions());
+          } else {
+            LOGGER.warn(
+                "Cannot enable SegmentPartitionMetadataManager for table: {} 
with multiple partition columns: {}",
+                tableNameWithType, columnPartitionMap.keySet());
+          }
         }
       }
-    }
 
-    QueryConfig queryConfig = tableConfig.getQueryConfig();
-    Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : 
null;
+      QueryConfig queryConfig = tableConfig.getQueryConfig();
+      Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() : 
null;
 
-    SegmentZkMetadataFetcher segmentZkMetadataFetcher = new 
SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
-    for (SegmentZkMetadataFetchListener listener : segmentPruners) {
-      segmentZkMetadataFetcher.register(listener);
-    }
-    if (partitionMetadataManager != null) {
-      segmentZkMetadataFetcher.register(partitionMetadataManager);
-    }
-    segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
+      SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+          new SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
+      for (SegmentZkMetadataFetchListener listener : segmentPruners) {
+        segmentZkMetadataFetcher.register(listener);
+      }
+      if (partitionMetadataManager != null) {
+        segmentZkMetadataFetcher.register(partitionMetadataManager);
+      }
+      segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
+
+      RoutingEntry routingEntry =
+          new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
+              segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+              timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
+      if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
+        LOGGER.info("Built routing for table: {}", tableNameWithType);
+      } else {
+        LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+      }
 
-    RoutingEntry routingEntry =
-        new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath, 
segmentPreSelector, segmentSelector,
-            segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
-            timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
-    if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
-      LOGGER.info("Built routing for table: {}", tableNameWithType);
-    } else {
-      LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+      // Check for updates to the IS / EV after adding the routing entry, as 
it is possible that the
+      // processSegmentAssignmentChange() may have run and missed updating 
this newly added entry. Only update
+      // the entry if:
+      // - The calculated build time for this table is older than the 
processSegmentAssignmentChange() timestamp, and
+      // - The IS or EV version has changed since the entry was added
+      if (_routingTableBuildStartTimeMs.get(tableNameWithType) < 
_processAssignmentChangeSnapshotTimestampMs) {
+        LOGGER.info("processSegmentAssignmentChange started after build 
routing for table was started, check if "
+            + "routing entry needs to be updated for table: {} to prevent 
missed updates", tableNameWithType);
+        idealStatePath = getIdealStatePath(tableNameWithType);
+        idealState = getIdealState(idealStatePath);
+        Preconditions.checkState(idealState != null, "Failed to find ideal 
state for table: %s", tableNameWithType);
+        idealStateVersion = idealState.getRecord().getVersion();
+
+        externalViewPath = getExternalViewPath(tableNameWithType);
+        externalView = getExternalView(externalViewPath);
+        // NOTE: External view might be null for new created tables. In such 
case, create an empty one and set the
+        // version to -1 to ensure the version does not match the next 
external view
+        if (externalView == null) {
+          externalViewVersion = -1;
+        } else {
+          externalViewVersion = externalView.getRecord().getVersion();
+        }
+
+        RoutingEntry existingRoutingEntry = 
_routingEntryMap.get(tableNameWithType);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to