Repository: spark Updated Branches: refs/heads/master 7986cc09b -> 73231860b
[SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively ## What changes were proposed in this pull request? Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment. ## How was this patch tested? existing unit tests for functional equivalence, new unit test to check for stack overflow Author: Jose Torres <[email protected]> Closes #19611 from joseph-torres/SPARK-22305. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73231860 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73231860 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73231860 Branch: refs/heads/master Commit: 73231860baaa40f6001db347e5dcb6b5bb65e032 Parents: 7986cc0 Author: Jose Torres <[email protected]> Authored: Tue Oct 31 11:53:50 2017 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Tue Oct 31 11:53:50 2017 -0700 ---------------------------------------------------------------------- .../state/HDFSBackedStateStoreProvider.scala | 45 ++++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73231860/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 36d6569..3f5002a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -297,17 +297,44 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { - if (version <= 0) return new MapType - synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { - val prevMap = loadMap(version - 1) - val newMap = new MapType(prevMap) - updateFromDeltaFile(version, newMap) - newMap + + // Shortcut if the map for this version is already there to avoid a redundant put. + val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) } + if (loadedCurrentVersionMap.isDefined) { + return loadedCurrentVersionMap.get + } + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { + synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } + return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) + } else { + lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { + updateFromDeltaFile(deltaVersion, resultMap) + } + + synchronized { loadedMaps.put(version, resultMap) } + resultMap } private def writeUpdateToDeltaFile( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
