Repository: spark Updated Branches: refs/heads/branch-2.0 24ea16598 -> 831c7c085
[SPARK-15430][SQL] Fix potential ConcurrentModificationException for ListAccumulator ## What changes were proposed in this pull request? In `ListAccumulator` we create an unmodifiable view for underlying list. However, it doesn't prevent the underlying to be modified further. So as we access the unmodifiable list, the underlying list can be modified in the same time. It could cause `java.util.ConcurrentModificationException`. We can observe such exception in recent tests. To fix it, we can copy a list of the underlying list and then create the unmodifiable view of this list instead. ## How was this patch tested? The exception might be difficult to test. Existing tests should be passed. Author: Liang-Chi Hsieh <[email protected]> Closes #13211 from viirya/fix-concurrentmodify. (cherry picked from commit 7920296bf8f313e010205937d3ebcbbc7b1a1d9e) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/831c7c08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/831c7c08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/831c7c08 Branch: refs/heads/branch-2.0 Commit: 831c7c085d9714c488ebc6876e0a8404a52c0e37 Parents: 24ea165 Author: Liang-Chi Hsieh <[email protected]> Authored: Sun May 22 08:08:46 2016 -0500 Committer: Sean Owen <[email protected]> Committed: Sun May 22 08:08:59 2016 -0500 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/831c7c08/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 13cb6a2..21ba460 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.{lang => jl} import java.io.ObjectInputStream +import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { - private val _list: java.util.List[T] = new java.util.ArrayList[T] + private val _list: java.util.List[T] = new ArrayList[T] override def isZero: Boolean = _list.isEmpty @@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + override def value: java.util.List[T] = _list.synchronized { + java.util.Collections.unmodifiableList(new ArrayList[T](_list)) + } private[spark] def setValue(newValue: java.util.List[T]): Unit = { _list.clear() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
