Jackie-Jiang commented on code in PR #16791:
URL: https://github.com/apache/pinot/pull/16791#discussion_r2356217704


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -143,8 +163,17 @@ public void init(HelixManager helixManager) {
     _propertyStore = helixManager.getHelixPropertyStore();
   }
 
+  private Object getRoutingTableBuildLock(String tableNameWithType) {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    return _routingTableBuildLocks.computeIfAbsent(rawTableName, k -> new 
Object());
+  }
+
+  private long getLastRoutingTableBuildStartTimeMs(String tableNameWithType) {
+    return _routingTableBuildStartTimeMs.computeIfAbsent(tableNameWithType, k 
-> Long.MIN_VALUE);
+  }
+
   @Override
-  public synchronized void processClusterChange(ChangeType changeType) {
+  public void processClusterChange(ChangeType changeType) {

Review Comment:
   (minor) We probably want to keep it `synchronized` to avoid confusion. It is 
synchronized anyway on the caller side (`ClusterChangeMediator`)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -156,66 +185,85 @@ public synchronized void processClusterChange(ChangeType 
changeType) {
   }
 
   private void processSegmentAssignmentChange() {

Review Comment:
   I believe the slowness mainly comes from this call, and we need to 
parallelize the handling



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -156,66 +185,85 @@ public synchronized void processClusterChange(ChangeType 
changeType) {
   }
 
   private void processSegmentAssignmentChange() {
-    LOGGER.info("Processing segment assignment change");
-    long startTimeMs = System.currentTimeMillis();
-
-    int numTables = _routingEntryMap.size();
-    if (numTables == 0) {
-      LOGGER.info("No table exists in the routing, skipping processing segment 
assignment change");
-      return;
-    }
-
-    List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
-    List<String> idealStatePaths = new ArrayList<>(numTables);
-    List<String> externalViewPaths = new ArrayList<>(numTables);
-    for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) {
-      routingEntries.add(entry.getValue());
-      idealStatePaths.add(entry.getValue()._idealStatePath);
-      externalViewPaths.add(entry.getValue()._externalViewPath);
-    }
-    Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, 
AccessOption.PERSISTENT);
-    Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, 
AccessOption.PERSISTENT);
-    long fetchStatsEndTimeMs = System.currentTimeMillis();
-
-    List<String> tablesToUpdate = new ArrayList<>();
-    for (int i = 0; i < numTables; i++) {
-      Stat idealStateStat = idealStateStats[i];
-      Stat externalViewStat = externalViewStats[i];
-      if (idealStateStat != null && externalViewStat != null) {
-        RoutingEntry routingEntry = routingEntries.get(i);
-        if (idealStateStat.getVersion() != 
routingEntry.getLastUpdateIdealStateVersion()
-            || externalViewStat.getVersion() != 
routingEntry.getLastUpdateExternalViewVersion()) {
-          String tableNameWithType = routingEntry.getTableNameWithType();
-          tablesToUpdate.add(tableNameWithType);
-          try {
-            IdealState idealState = 
getIdealState(routingEntry._idealStatePath);
-            if (idealState == null) {
-              LOGGER.warn("Failed to find ideal state for table: {}, skipping 
updating routing entry",
-                  tableNameWithType);
-              continue;
-            }
-            ExternalView externalView = 
getExternalView(routingEntry._externalViewPath);
-            if (externalView == null) {
-              LOGGER.warn("Failed to find external view for table: {}, 
skipping updating routing entry",
-                  tableNameWithType);
-              continue;
+    _globalLock.readLock().lock();
+    try {
+      LOGGER.info("Processing segment assignment change");
+      long startTimeMs = System.currentTimeMillis();
+
+      Map<String, RoutingEntry> routingEntrySnapshot = new 
HashMap<>(_routingEntryMap);
+
+      int numTables = routingEntrySnapshot.size();
+      if (numTables == 0) {
+        LOGGER.info("No table exists in the routing, skipping processing 
segment assignment change");
+        return;
+      }
+
+      List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
+      List<String> idealStatePaths = new ArrayList<>(numTables);
+      List<String> externalViewPaths = new ArrayList<>(numTables);
+      for (Map.Entry<String, RoutingEntry> entry : 
routingEntrySnapshot.entrySet()) {
+        routingEntries.add(entry.getValue());
+        idealStatePaths.add(entry.getValue()._idealStatePath);
+        externalViewPaths.add(entry.getValue()._externalViewPath);
+      }
+      Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths, 
AccessOption.PERSISTENT);
+      Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths, 
AccessOption.PERSISTENT);
+      long fetchStatsEndTimeMs = System.currentTimeMillis();
+
+      List<String> tablesToUpdate = new ArrayList<>();
+      for (int i = 0; i < numTables; i++) {
+        Stat idealStateStat = idealStateStats[i];
+        Stat externalViewStat = externalViewStats[i];
+        if (idealStateStat != null && externalViewStat != null) {
+          RoutingEntry cachedRoutingEntry = routingEntries.get(i);
+          // The routingEntry may have been removed from the _routingEntryMap 
by the time we get here in case
+          // one of the other functions such as 'removeRouting' was called 
since taking the snapshot. Check for
+          // existence before proceeding. Also note that if new entries were 
added since the snapshot was taken, we

Review Comment:
   (MAJOR) Missing update for a new added entry is a major problem. Let's think 
more about this.
   
   I believe there is a race condition where IS update could be missing:
   T1: A new routing entry started being build
   T2: An IS state change happening which triggers this callback
   T3: A new routing entry getting added
   
   The change between T1 to T2 is missing



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -109,12 +116,25 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
   private final BrokerMetrics _brokerMetrics;
   private final Map<String, RoutingEntry> _routingEntryMap = new 
ConcurrentHashMap<>();
   private final Map<String, ServerInstance> _enabledServerInstanceMap = new 
ConcurrentHashMap<>();
-  // NOTE: _excludedServers doesn't need to be concurrent because it is only 
accessed within the synchronized block
-  private final Set<String> _excludedServers = new HashSet<>();
+  // Thread-safe set because it can be read/modified concurrently from 
instance/server change paths
+  private final Set<String> _excludedServers = ConcurrentHashMap.newKeySet();

Review Comment:
   Is this always protected by the global read-write lock? If so, it doesn't 
need to be concurrent. `_routingEntryMap` and `_enabledServerInstanceMap` must 
be concurrent because `getRoutingTable()` is not protected by the global lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to