Repository: spark
Updated Branches:
  refs/heads/branch-2.1 b424dc947 -> e469d3bad


[SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when 
stopped even if it was not started

## What changes were proposed in this pull request?

Several tests are being failed on Windows due to the failure of removing the 
checkpoint dir between each tests.

This is caused by not closed file in `ReceiverTracker`. When it is not started, 
it does not close it even if `stop()` is called.

```
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery 
started
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery 
failed: java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\1478983663710-0, took 3.828 sec
    at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
    at org.apache.spark.util.Utils.deleteRecursively(Utils.scala)
    at 
org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery(JavaAPISuite.java:1809)
    ...
```

```
- mapWithState - basic operations with simple API (7 seconds, 640 milliseconds)
Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.MapWithStateSuite *** ABORTED *** (12 seconds, 688 
milliseconds)
  java.io.IOException: Failed to delete: 
C:\projects\spark\streaming\checkpoint\spark-b8486e2b-6468-4e6f-bb24-88277d2c033c
  ...
```

## How was this patch tested?

Tests in `JavaAPISuite` and `MapWithStateSuite`.

Manually tested via AppVeyor:

**Before**

- `org.apache.spark.streaming.JavaAPISuite`
  Build: 
https://ci.appveyor.com/project/spark-test/spark/build/71-MapWithStateSuite-1
  Diff: 
https://github.com/apache/spark/compare/master...spark-test:188c828e682ec45b75d15c3dfc782bcdc8ce024c

- `org.apache.spark.streaming.MapWithStateSuite`
  Build: 
https://ci.appveyor.com/project/spark-test/spark/build/72-MapWithStateSuite-1
  Diff: 
https://github.com/apache/spark/compare/master...spark-test:8f6945d0ccde022a23d3848f6b7fe6da1e7c902e

**After**

- `org.apache.spark.streaming.JavaAPISuite`
  Build started: [Streaming] `org.apache.spark.streaming.JavaAPISuite` 
[![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=3D74F2D5-B0D5-4E1D-874C-685AE694FD37&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/3D74F2D5-B0D5-4E1D-874C-685AE694FD37)
  Diff: 
https://github.com/apache/spark/compare/master...spark-test:3D74F2D5-B0D5-4E1D-874C-685AE694FD37

- `org.apache.spark.streaming.MapWithStateSuite`
  Build started: [Streaming] `org.apache.spark.streaming.MapWithStateSuite` 
[![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8E88B64-49F0-4157-9AFA-FC3ACC442351&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8E88B64-49F0-4157-9AFA-FC3ACC442351)
  Diff: 
https://github.com/apache/spark/compare/master...spark-test:C8E88B64-49F0-4157-9AFA-FC3ACC442351

Author: hyukjinkwon <[email protected]>

Closes #15867 from HyukjinKwon/SPARK-18423.

(cherry picked from commit 503378f10ca92064034aa88e0feebe4718af8bbe)
Signed-off-by: Shixiong Zhu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e469d3ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e469d3ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e469d3ba

Branch: refs/heads/branch-2.1
Commit: e469d3badffdf9d1cd8399a06d0bdb61781e76d4
Parents: b424dc9
Author: hyukjinkwon <[email protected]>
Authored: Tue Nov 15 15:44:15 2016 -0800
Committer: Shixiong Zhu <[email protected]>
Committed: Tue Nov 15 15:44:22 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/scheduler/ReceiverTracker.scala       | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e469d3ba/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b9d898a..8f55d98 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
       receivedBlockTracker.stop()
       logInfo("ReceiverTracker stopped")
       trackerState = Stopped
+    } else if (isTrackerInitialized) {
+      trackerState = Stopping
+      // `ReceivedBlockTracker` is open when this instance is created. We 
should
+      // close this even if this `ReceiverTracker` is not started.
+      receivedBlockTracker.stop()
+      logInfo("ReceiverTracker stopped")
+      trackerState = Stopped
     }
   }
 
@@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
     endpoint.send(StartAllReceivers(receivers))
   }
 
+  /** Check if tracker has been marked for initiated */
+  private def isTrackerInitialized: Boolean = trackerState == Initialized
+
   /** Check if tracker has been marked for starting */
   private def isTrackerStarted: Boolean = trackerState == Started
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to