Repository: spark Updated Branches: refs/heads/master 7bd46d987 -> 70a68b328
[SPARK-23020][CORE] Fix race in SparkAppHandle cleanup, again. Third time is the charm? There was still a race that was left in previous attempts. If the handle closes the connection, the close() implementation would clean up state that would prevent the thread from waiting on the connection thread to finish. That could cause the race causing the test flakiness reported in the bug. The fix is to move the "wait for connection thread" code to a separate close method that is used by the handle; that also simplifies the code a bit and makes it also easier to follow. I included an unrelated, but correct, change to a YARN test so that it triggers when the PR is built. Tested by inserting a sleep in the connection thread to mimic the race; test failed reliably with the sleep, passes now. (Sleep not included in the patch.) Also ran YARN tests to make sure. Author: Marcelo Vanzin <[email protected]> Closes #20388 from vanzin/SPARK-23020. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70a68b32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70a68b32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70a68b32 Branch: refs/heads/master Commit: 70a68b328b856c17eb22cc86fee0ebe8d64f8825 Parents: 7bd46d9 Author: Marcelo Vanzin <[email protected]> Authored: Fri Jan 26 11:58:20 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Fri Jan 26 11:58:20 2018 +0800 ---------------------------------------------------------------------- .../spark/launcher/AbstractAppHandle.java | 42 +++++++++------ .../spark/launcher/ChildProcAppHandle.java | 11 +--- .../spark/launcher/InProcessAppHandle.java | 9 +--- .../apache/spark/launcher/LauncherServer.java | 55 +++++++++----------- .../spark/deploy/yarn/YarnClusterSuite.scala | 5 +- 5 files changed, 55 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java index daf0972..84a25a5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java @@ -20,6 +20,7 @@ package org.apache.spark.launcher; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,15 +30,15 @@ abstract class AbstractAppHandle implements SparkAppHandle { private final LauncherServer server; - private LauncherConnection connection; + private LauncherServer.ServerConnection connection; private List<Listener> listeners; - private State state; + private AtomicReference<State> state; private String appId; private volatile boolean disposed; protected AbstractAppHandle(LauncherServer server) { this.server = server; - this.state = State.UNKNOWN; + this.state = new AtomicReference<>(State.UNKNOWN); } @Override @@ -50,7 +51,7 @@ abstract class AbstractAppHandle implements SparkAppHandle { @Override public State getState() { - return state; + return state.get(); } @Override @@ -73,7 +74,7 @@ abstract class AbstractAppHandle implements SparkAppHandle { if (!isDisposed()) { if (connection != null) { try { - connection.close(); + connection.closeAndWait(); } catch (IOException ioe) { // no-op. } @@ -82,7 +83,7 @@ abstract class AbstractAppHandle implements SparkAppHandle { } } - void setConnection(LauncherConnection connection) { + void setConnection(LauncherServer.ServerConnection connection) { this.connection = connection; } @@ -99,12 +100,9 @@ abstract class AbstractAppHandle implements SparkAppHandle { */ synchronized void dispose() { if (!isDisposed()) { - // Unregister first to make sure that the connection with the app has been really - // terminated. server.unregister(this); - if (!getState().isFinal()) { - setState(State.LOST); - } + // Set state to LOST if not yet final. + setState(State.LOST, false); this.disposed = true; } } @@ -113,14 +111,24 @@ abstract class AbstractAppHandle implements SparkAppHandle { setState(s, false); } - synchronized void setState(State s, boolean force) { - if (force || !state.isFinal()) { - state = s; + void setState(State s, boolean force) { + if (force) { + state.set(s); fireEvent(false); - } else { - LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", - new Object[] { state, s }); + return; } + + State current = state.get(); + while (!current.isFinal()) { + if (state.compareAndSet(current, s)) { + fireEvent(false); + return; + } + current = state.get(); + } + + LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", + new Object[] { current, s }); } synchronized void setAppId(String appId) { http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 2b99461..5e3c956 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -104,19 +104,12 @@ class ChildProcAppHandle extends AbstractAppHandle { ec = 1; } - State currState = getState(); - State newState = null; if (ec != 0) { + State currState = getState(); // Override state with failure if the current state is not final, or is success. if (!currState.isFinal() || currState == State.FINISHED) { - newState = State.FAILED; + setState(State.FAILED, true); } - } else if (!currState.isFinal()) { - newState = State.LOST; - } - - if (newState != null) { - setState(newState, true); } disconnect(); http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java index f04263c..b8030e0 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java @@ -66,14 +66,7 @@ class InProcessAppHandle extends AbstractAppHandle { setState(State.FAILED); } - synchronized (InProcessAppHandle.this) { - if (!isDisposed()) { - disconnect(); - if (!getState().isFinal()) { - setState(State.LOST, true); - } - } - } + disconnect(); }); app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName)); http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index 8091885..f4ecd52 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -218,32 +218,6 @@ class LauncherServer implements Closeable { } } - // If there is a live connection for this handle, we need to wait for it to finish before - // returning, otherwise there might be a race between the connection thread processing - // buffered data and the handle cleaning up after itself, leading to potentially the wrong - // state being reported for the handle. - ServerConnection conn = null; - synchronized (clients) { - for (ServerConnection c : clients) { - if (c.handle == handle) { - conn = c; - break; - } - } - } - - if (conn != null) { - synchronized (conn) { - if (conn.isOpen()) { - try { - conn.wait(); - } catch (InterruptedException ie) { - // Ignore. - } - } - } - } - unref(); } @@ -312,9 +286,10 @@ class LauncherServer implements Closeable { } } - private class ServerConnection extends LauncherConnection { + class ServerConnection extends LauncherConnection { private TimerTask timeout; + private volatile Thread connectionThread; volatile AbstractAppHandle handle; ServerConnection(Socket socket, TimerTask timeout) throws IOException { @@ -323,6 +298,12 @@ class LauncherServer implements Closeable { } @Override + public void run() { + this.connectionThread = Thread.currentThread(); + super.run(); + } + + @Override protected void handle(Message msg) throws IOException { try { if (msg instanceof Hello) { @@ -376,9 +357,23 @@ class LauncherServer implements Closeable { clients.remove(this); } - synchronized (this) { - super.close(); - notifyAll(); + super.close(); + } + + /** + * Close the connection and wait for any buffered data to be processed before returning. + * This ensures any changes reported by the child application take effect. + */ + public void closeAndWait() throws IOException { + close(); + + Thread connThread = this.connectionThread; + if (Thread.currentThread() != connThread) { + try { + connThread.join(); + } catch (InterruptedException ie) { + // Ignore. + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e9dcfaf..5003326 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -45,8 +45,7 @@ import org.apache.spark.util.Utils /** * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN - * applications, and require the Spark assembly to be built before they can be successfully - * run. + * applications. */ @ExtendedYarnTest class YarnClusterSuite extends BaseYarnClusterSuite { @@ -152,7 +151,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } test("run Python application in yarn-cluster mode using " + - " spark.yarn.appMasterEnv to override local envvar") { + "spark.yarn.appMasterEnv to override local envvar") { testPySpark( clientMode = false, extraConf = Map( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
