This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f788b2995 Fix the deadlock in ClusterChangeMediator (#8572)
4f788b2995 is described below

commit 4f788b29950f72ab1504096d717f768461425274
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Apr 21 10:19:05 2022 -0700

    Fix the deadlock in ClusterChangeMediator (#8572)
    
    Fix the potential deadlock in `ClusterChangeMediator` between Helix 
callback handling thread and ClusterChangeMediator change handling thread when 
the ClusterChangeMediator is stopped.
    
    The deadlock chain is as following:
    - ClusterChangeMediator.stop() is called and waiting for the change 
handling thread to stop
    - Helix callback handling thread acquires a lock, then send a cluster 
change to the mediator which is blocked ClusterChangeMediator.stop() (both 
stop() and enqueueChange() are synchronized)
    - The change handling thread waits on the lock held by the Helix callback 
handling thread
---
 .../broker/broker/helix/ClusterChangeMediator.java | 112 ++++++++++-----------
 .../broker/helix/ClusterChangeMediatorTest.java    |  89 ++++++++++++++++
 2 files changed, 143 insertions(+), 58 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index b2bcaa547d..83beeebcd0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.broker.helix;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
@@ -61,12 +62,12 @@ public class ClusterChangeMediator
   private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
 
   private final Map<ChangeType, List<ClusterChangeHandler>> _changeHandlersMap;
-  private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>();
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new 
ConcurrentHashMap<>();
   private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>();
 
   private final Thread _clusterChangeHandlingThread;
 
-  private volatile boolean _stopped = false;
+  private volatile boolean _running;
 
   public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> 
changeHandlersMap,
       BrokerMetrics brokerMetrics) {
@@ -78,56 +79,53 @@ public class ClusterChangeMediator
       _lastProcessTimeMap.put(changeType, initTime);
     }
 
-    _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
-      @Override
-      public void run() {
-        while (true) {
-          try {
-            for (Map.Entry<ChangeType, List<ClusterChangeHandler>> entry : 
_changeHandlersMap.entrySet()) {
-              if (_stopped) {
-                return;
-              }
-              ChangeType changeType = entry.getKey();
-              List<ClusterChangeHandler> changeHandlers = entry.getValue();
-              long currentTime = System.currentTimeMillis();
-              Long lastChangeTime;
-              synchronized (_lastChangeTimeMap) {
-                lastChangeTime = _lastChangeTimeMap.remove(changeType);
-              }
-              if (lastChangeTime != null) {
-                
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime 
- lastChangeTime,
-                    TimeUnit.MILLISECONDS);
+    _clusterChangeHandlingThread = new Thread(() -> {
+      while (_running) {
+        try {
+          for (Map.Entry<ChangeType, List<ClusterChangeHandler>> entry : 
_changeHandlersMap.entrySet()) {
+            if (!_running) {
+              return;
+            }
+            ChangeType changeType = entry.getKey();
+            List<ClusterChangeHandler> changeHandlers = entry.getValue();
+            long currentTime = System.currentTimeMillis();
+            Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+            if (lastChangeTime != null) {
+              
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime 
- lastChangeTime,
+                  TimeUnit.MILLISECONDS);
+              processClusterChange(changeType, changeHandlers);
+            } else {
+              long lastProcessTime = _lastProcessTimeMap.get(changeType);
+              if (currentTime - lastProcessTime > 
PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
+                LOGGER.info("Proactive check {} change", changeType);
+                
brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 
1L);
                 processClusterChange(changeType, changeHandlers);
-              } else {
-                long lastProcessTime = _lastProcessTimeMap.get(changeType);
-                if (currentTime - lastProcessTime > 
PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
-                  LOGGER.info("Proactive check {} change", changeType);
-                  
brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 
1L);
-                  processClusterChange(changeType, changeHandlers);
-                }
               }
             }
-            synchronized (_lastChangeTimeMap) {
-              if (_stopped) {
-                return;
-              }
-              // Wait for at most 1/10 of proactive change check interval if 
no new event received. This can guarantee
-              // that the proactive change check will not be delayed for more 
than 1/10 of the interval. In case of
-              // spurious wakeup, execute the while loop again for the 
proactive change check.
-              if (_lastChangeTimeMap.isEmpty()) {
-                _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 
10);
-              }
+          }
+          synchronized (_lastChangeTimeMap) {
+            if (!_running) {
+              return;
+            }
+            // Wait for at most 1/10 of proactive change check interval if no 
new event received. This can guarantee
+            // that the proactive change check will not be delayed for more 
than 1/10 of the interval. In case of
+            // spurious wakeup, execute the while loop again for the proactive 
change check.
+            if (_lastChangeTimeMap.isEmpty()) {
+              _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 10);
             }
-          } catch (Exception e) {
+          }
+        } catch (Exception e) {
+          if (_running) {
             // Ignore all exceptions. The thread keeps running until 
ClusterChangeMediator.stop() is invoked.
             LOGGER.error("Caught exception within cluster change handling 
thread", e);
           }
         }
       }
-    };
+    }, "ClusterChangeHandlingThread");
+    _clusterChangeHandlingThread.setDaemon(true);
   }
 
-  private void processClusterChange(ChangeType changeType, 
List<ClusterChangeHandler> changeHandlers) {
+  private synchronized void processClusterChange(ChangeType changeType, 
List<ClusterChangeHandler> changeHandlers) {
     long startTime = System.currentTimeMillis();
     LOGGER.info("Start processing {} change", changeType);
     for (ClusterChangeHandler changeHandler : changeHandlers) {
@@ -149,25 +147,23 @@ public class ClusterChangeMediator
   /**
    * Starts the cluster change mediator.
    */
-  public synchronized void start() {
-    LOGGER.info("Starting the cluster change handling thread");
+  public void start() {
+    LOGGER.info("Starting ClusterChangeMediator");
+    _running = true;
     _clusterChangeHandlingThread.start();
   }
 
   /**
    * Stops the cluster change mediator.
    */
-  public synchronized void stop() {
-    LOGGER.info("Stopping the cluster change handling thread");
-    _stopped = true;
-    synchronized (_lastChangeTimeMap) {
-      _lastChangeTimeMap.notify();
-    }
+  public void stop() {
+    LOGGER.info("Stopping ClusterChangeMediator");
+    _running = false;
     try {
+      _clusterChangeHandlingThread.interrupt();
       _clusterChangeHandlingThread.join();
     } catch (InterruptedException e) {
-      LOGGER.error("Caught InterruptedException while waiting for cluster 
change handling thread to die");
-      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while waiting for cluster change 
handling thread to finish", e);
     }
   }
 
@@ -210,21 +206,21 @@ public class ClusterChangeMediator
    *
    * @param changeType Type of the change
    */
-  private synchronized void enqueueChange(ChangeType changeType) {
+  private void enqueueChange(ChangeType changeType) {
     // Do not enqueue or process changes if already stopped
-    if (_stopped) {
+    if (!_running) {
+      LOGGER.warn("ClusterChangeMediator already stopped, skipping enqueuing 
the {} change", changeType);
       return;
     }
     if (_clusterChangeHandlingThread.isAlive()) {
-      LOGGER.info("Enqueue {} change", changeType);
-      synchronized (_lastChangeTimeMap) {
-        if (!_lastChangeTimeMap.containsKey(changeType)) {
-          _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+      LOGGER.info("Enqueueing {} change", changeType);
+      if (_lastChangeTimeMap.putIfAbsent(changeType, 
System.currentTimeMillis()) == null) {
+        synchronized (_lastChangeTimeMap) {
           _lastChangeTimeMap.notify();
         }
       }
     } else {
-      LOGGER.error("Cluster change handling thread is not alive, directly 
process the {} change", changeType);
+      LOGGER.warn("Cluster change handling thread is not alive, directly 
process the {} change", changeType);
       processClusterChange(changeType, _changeHandlersMap.get(changeType));
     }
   }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediatorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediatorTest.java
new file mode 100644
index 0000000000..52ab23d7d2
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediatorTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker.helix;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+
+
+@SuppressWarnings("UnstableApiUsage")
+public class ClusterChangeMediatorTest {
+  private final Lock _lock = new ReentrantLock();
+
+  /**
+   * Tests the potential deadlock between Helix callback handling thread and 
ClusterChangeMediator change handling
+   * thread when the ClusterChangeMediator is stopped.
+   * The deadlock chain is as following:
+   * - ClusterChangeMediator.stop() is called and waiting for the change 
handling thread to stop
+   * - Helix callback handling thread acquires a lock, then send a cluster 
change to the mediator which is blocked
+   *   ClusterChangeMediator.stop() (both stop() and enqueueChange() are 
synchronized)
+   * - The change handling thread waits on the lock held by the Helix callback 
handling thread
+   */
+  @Test
+  public void testDeadLock() {
+    ClusterChangeMediator mediator = new ClusterChangeMediator(
+        Collections.singletonMap(ChangeType.IDEAL_STATE, 
Collections.singletonList(new Handler())),
+        mock(BrokerMetrics.class));
+    mediator.start();
+
+    new Thread(() -> {
+      sendClusterChange(mediator);
+      Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+      sendClusterChange(mediator);
+    }).start();
+
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    mediator.stop();
+  }
+
+  private void sendClusterChange(ClusterChangeMediator mediator) {
+    _lock.lock();
+    try {
+      mediator.onIdealStateChange(Collections.emptyList(), 
mock(NotificationContext.class));
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  private class Handler implements ClusterChangeHandler {
+
+    @Override
+    public void init(HelixManager helixManager) {
+    }
+
+    @Override
+    public void processClusterChange(ChangeType changeType) {
+      Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+      _lock.lock();
+      _lock.unlock();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to