somandal commented on code in PR #16791:
URL: https://github.com/apache/pinot/pull/16791#discussion_r2356301863


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -156,66 +185,85 @@ public synchronized void processClusterChange(ChangeType 
changeType) {
   }
 
   private void processSegmentAssignmentChange() {
-    LOGGER.info("Processing segment assignment change");
-    long startTimeMs = System.currentTimeMillis();
-
-    int numTables = _routingEntryMap.size();
-    if (numTables == 0) {
-      LOGGER.info("No table exists in the routing, skipping processing segment 
assignment change");
-      return;
-    }
-
-    List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
-    List<String> idealStatePaths = new ArrayList<>(numTables);
-    List<String> externalViewPaths = new ArrayList<>(numTables);
-    for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) {
-      routingEntries.add(entry.getValue());
-      idealStatePaths.add(entry.getValue()._idealStatePath);
-      externalViewPaths.add(entry.getValue()._externalViewPath);
-    }
-    Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, 
AccessOption.PERSISTENT);
-    Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, 
AccessOption.PERSISTENT);
-    long fetchStatsEndTimeMs = System.currentTimeMillis();
-
-    List<String> tablesToUpdate = new ArrayList<>();
-    for (int i = 0; i < numTables; i++) {
-      Stat idealStateStat = idealStateStats[i];
-      Stat externalViewStat = externalViewStats[i];
-      if (idealStateStat != null && externalViewStat != null) {
-        RoutingEntry routingEntry = routingEntries.get(i);
-        if (idealStateStat.getVersion() != 
routingEntry.getLastUpdateIdealStateVersion()
-            || externalViewStat.getVersion() != 
routingEntry.getLastUpdateExternalViewVersion()) {
-          String tableNameWithType = routingEntry.getTableNameWithType();
-          tablesToUpdate.add(tableNameWithType);
-          try {
-            IdealState idealState = 
getIdealState(routingEntry._idealStatePath);
-            if (idealState == null) {
-              LOGGER.warn("Failed to find ideal state for table: {}, skipping 
updating routing entry",
-                  tableNameWithType);
-              continue;
-            }
-            ExternalView externalView = 
getExternalView(routingEntry._externalViewPath);
-            if (externalView == null) {
-              LOGGER.warn("Failed to find external view for table: {}, 
skipping updating routing entry",
-                  tableNameWithType);
-              continue;
+    _globalLock.readLock().lock();
+    try {
+      LOGGER.info("Processing segment assignment change");
+      long startTimeMs = System.currentTimeMillis();
+
+      Map<String, RoutingEntry> routingEntrySnapshot = new 
HashMap<>(_routingEntryMap);
+
+      int numTables = routingEntrySnapshot.size();
+      if (numTables == 0) {
+        LOGGER.info("No table exists in the routing, skipping processing 
segment assignment change");
+        return;
+      }
+
+      List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
+      List<String> idealStatePaths = new ArrayList<>(numTables);
+      List<String> externalViewPaths = new ArrayList<>(numTables);
+      for (Map.Entry<String, RoutingEntry> entry : 
routingEntrySnapshot.entrySet()) {
+        routingEntries.add(entry.getValue());
+        idealStatePaths.add(entry.getValue()._idealStatePath);
+        externalViewPaths.add(entry.getValue()._externalViewPath);
+      }
+      Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, 
AccessOption.PERSISTENT);
+      Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, 
AccessOption.PERSISTENT);
+      long fetchStatsEndTimeMs = System.currentTimeMillis();
+
+      List<String> tablesToUpdate = new ArrayList<>();
+      for (int i = 0; i < numTables; i++) {
+        Stat idealStateStat = idealStateStats[i];
+        Stat externalViewStat = externalViewStats[i];
+        if (idealStateStat != null && externalViewStat != null) {
+          RoutingEntry cachedRoutingEntry = routingEntries.get(i);
+          // The routingEntry may have been removed from the _routingEntryMap 
by the time we get here in case
+          // one of the other functions such as 'removeRouting' was called 
since taking the snapshot. Check for
+          // existence before proceeding. Also note that if new entries were 
added since the snapshot was taken, we

Review Comment:
   got it, so the above is a concern, we can get IS change even for new entries 
and need to act on those. even doing two passes will result in the same issue
   
   In that case, we'll probably need to take the write lock, and then can't 
parallelize this and the buildRouting after all, right? so does any of these 
synchronization changes make sense? looks like there isn't a good way to 
parallelize at all cc @xiangfu0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to