This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4f788b2995 Fix the deadlock in ClusterChangeMediator (#8572) 4f788b2995 is described below commit 4f788b29950f72ab1504096d717f768461425274 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Apr 21 10:19:05 2022 -0700 Fix the deadlock in ClusterChangeMediator (#8572) Fix the potential deadlock in `ClusterChangeMediator` between Helix callback handling thread and ClusterChangeMediator change handling thread when the ClusterChangeMediator is stopped. The deadlock chain is as following: - ClusterChangeMediator.stop() is called and waiting for the change handling thread to stop - Helix callback handling thread acquires a lock, then send a cluster change to the mediator which is blocked ClusterChangeMediator.stop() (both stop() and enqueueChange() are synchronized) - The change handling thread waits on the lock held by the Helix callback handling thread --- .../broker/broker/helix/ClusterChangeMediator.java | 112 ++++++++++----------- .../broker/helix/ClusterChangeMediatorTest.java | 89 ++++++++++++++++ 2 files changed, 143 insertions(+), 58 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java index b2bcaa547d..83beeebcd0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java @@ -21,6 +21,7 @@ package org.apache.pinot.broker.broker.helix; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.helix.HelixConstants.ChangeType; import org.apache.helix.NotificationContext; @@ -61,12 +62,12 @@ public class ClusterChangeMediator private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L; private final Map<ChangeType, List<ClusterChangeHandler>> _changeHandlersMap; - private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>(); + private final Map<ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap<>(); private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>(); private final Thread _clusterChangeHandlingThread; - private volatile boolean _stopped = false; + private volatile boolean _running; public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap, BrokerMetrics brokerMetrics) { @@ -78,56 +79,53 @@ public class ClusterChangeMediator _lastProcessTimeMap.put(changeType, initTime); } - _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") { - @Override - public void run() { - while (true) { - try { - for (Map.Entry<ChangeType, List<ClusterChangeHandler>> entry : _changeHandlersMap.entrySet()) { - if (_stopped) { - return; - } - ChangeType changeType = entry.getKey(); - List<ClusterChangeHandler> changeHandlers = entry.getValue(); - long currentTime = System.currentTimeMillis(); - Long lastChangeTime; - synchronized (_lastChangeTimeMap) { - lastChangeTime = _lastChangeTimeMap.remove(changeType); - } - if (lastChangeTime != null) { - brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime, - TimeUnit.MILLISECONDS); + _clusterChangeHandlingThread = new Thread(() -> { + while (_running) { + try { + for (Map.Entry<ChangeType, List<ClusterChangeHandler>> entry : _changeHandlersMap.entrySet()) { + if (!_running) { + return; + } + ChangeType changeType = entry.getKey(); + List<ClusterChangeHandler> changeHandlers = entry.getValue(); + long currentTime = System.currentTimeMillis(); + Long lastChangeTime = _lastChangeTimeMap.remove(changeType); + if (lastChangeTime != null) { + brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime - lastChangeTime, + TimeUnit.MILLISECONDS); + processClusterChange(changeType, changeHandlers); + } else { + long lastProcessTime = _lastProcessTimeMap.get(changeType); + if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) { + LOGGER.info("Proactive check {} change", changeType); + brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L); processClusterChange(changeType, changeHandlers); - } else { - long lastProcessTime = _lastProcessTimeMap.get(changeType); - if (currentTime - lastProcessTime > PROACTIVE_CHANGE_CHECK_INTERVAL_MS) { - LOGGER.info("Proactive check {} change", changeType); - brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L); - processClusterChange(changeType, changeHandlers); - } } } - synchronized (_lastChangeTimeMap) { - if (_stopped) { - return; - } - // Wait for at most 1/10 of proactive change check interval if no new event received. This can guarantee - // that the proactive change check will not be delayed for more than 1/10 of the interval. In case of - // spurious wakeup, execute the while loop again for the proactive change check. - if (_lastChangeTimeMap.isEmpty()) { - _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 10); - } + } + synchronized (_lastChangeTimeMap) { + if (!_running) { + return; + } + // Wait for at most 1/10 of proactive change check interval if no new event received. This can guarantee + // that the proactive change check will not be delayed for more than 1/10 of the interval. In case of + // spurious wakeup, execute the while loop again for the proactive change check. + if (_lastChangeTimeMap.isEmpty()) { + _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 10); } - } catch (Exception e) { + } + } catch (Exception e) { + if (_running) { // Ignore all exceptions. The thread keeps running until ClusterChangeMediator.stop() is invoked. LOGGER.error("Caught exception within cluster change handling thread", e); } } } - }; + }, "ClusterChangeHandlingThread"); + _clusterChangeHandlingThread.setDaemon(true); } - private void processClusterChange(ChangeType changeType, List<ClusterChangeHandler> changeHandlers) { + private synchronized void processClusterChange(ChangeType changeType, List<ClusterChangeHandler> changeHandlers) { long startTime = System.currentTimeMillis(); LOGGER.info("Start processing {} change", changeType); for (ClusterChangeHandler changeHandler : changeHandlers) { @@ -149,25 +147,23 @@ public class ClusterChangeMediator /** * Starts the cluster change mediator. */ - public synchronized void start() { - LOGGER.info("Starting the cluster change handling thread"); + public void start() { + LOGGER.info("Starting ClusterChangeMediator"); + _running = true; _clusterChangeHandlingThread.start(); } /** * Stops the cluster change mediator. */ - public synchronized void stop() { - LOGGER.info("Stopping the cluster change handling thread"); - _stopped = true; - synchronized (_lastChangeTimeMap) { - _lastChangeTimeMap.notify(); - } + public void stop() { + LOGGER.info("Stopping ClusterChangeMediator"); + _running = false; try { + _clusterChangeHandlingThread.interrupt(); _clusterChangeHandlingThread.join(); } catch (InterruptedException e) { - LOGGER.error("Caught InterruptedException while waiting for cluster change handling thread to die"); - Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for cluster change handling thread to finish", e); } } @@ -210,21 +206,21 @@ public class ClusterChangeMediator * * @param changeType Type of the change */ - private synchronized void enqueueChange(ChangeType changeType) { + private void enqueueChange(ChangeType changeType) { // Do not enqueue or process changes if already stopped - if (_stopped) { + if (!_running) { + LOGGER.warn("ClusterChangeMediator already stopped, skipping enqueuing the {} change", changeType); return; } if (_clusterChangeHandlingThread.isAlive()) { - LOGGER.info("Enqueue {} change", changeType); - synchronized (_lastChangeTimeMap) { - if (!_lastChangeTimeMap.containsKey(changeType)) { - _lastChangeTimeMap.put(changeType, System.currentTimeMillis()); + LOGGER.info("Enqueueing {} change", changeType); + if (_lastChangeTimeMap.putIfAbsent(changeType, System.currentTimeMillis()) == null) { + synchronized (_lastChangeTimeMap) { _lastChangeTimeMap.notify(); } } } else { - LOGGER.error("Cluster change handling thread is not alive, directly process the {} change", changeType); + LOGGER.warn("Cluster change handling thread is not alive, directly process the {} change", changeType); processClusterChange(changeType, _changeHandlersMap.get(changeType)); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediatorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediatorTest.java new file mode 100644 index 0000000000..52ab23d7d2 --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediatorTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker.helix; + +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.helix.HelixConstants.ChangeType; +import org.apache.helix.HelixManager; +import org.apache.helix.NotificationContext; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; + + +@SuppressWarnings("UnstableApiUsage") +public class ClusterChangeMediatorTest { + private final Lock _lock = new ReentrantLock(); + + /** + * Tests the potential deadlock between Helix callback handling thread and ClusterChangeMediator change handling + * thread when the ClusterChangeMediator is stopped. + * The deadlock chain is as following: + * - ClusterChangeMediator.stop() is called and waiting for the change handling thread to stop + * - Helix callback handling thread acquires a lock, then send a cluster change to the mediator which is blocked + * ClusterChangeMediator.stop() (both stop() and enqueueChange() are synchronized) + * - The change handling thread waits on the lock held by the Helix callback handling thread + */ + @Test + public void testDeadLock() { + ClusterChangeMediator mediator = new ClusterChangeMediator( + Collections.singletonMap(ChangeType.IDEAL_STATE, Collections.singletonList(new Handler())), + mock(BrokerMetrics.class)); + mediator.start(); + + new Thread(() -> { + sendClusterChange(mediator); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + sendClusterChange(mediator); + }).start(); + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + mediator.stop(); + } + + private void sendClusterChange(ClusterChangeMediator mediator) { + _lock.lock(); + try { + mediator.onIdealStateChange(Collections.emptyList(), mock(NotificationContext.class)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + _lock.unlock(); + } + } + + private class Handler implements ClusterChangeHandler { + + @Override + public void init(HelixManager helixManager) { + } + + @Override + public void processClusterChange(ChangeType changeType) { + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + _lock.lock(); + _lock.unlock(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org