Repository: spark
Updated Branches:
  refs/heads/branch-2.0 24ea16598 -> 831c7c085


[SPARK-15430][SQL] Fix potential ConcurrentModificationException for 
ListAccumulator

## What changes were proposed in this pull request?

In `ListAccumulator` we create an unmodifiable view for underlying list. 
However, it doesn't prevent the underlying to be modified further. So as we 
access the unmodifiable list, the underlying list can be modified in the same 
time. It could cause `java.util.ConcurrentModificationException`. We can 
observe such exception in recent tests.

To fix it, we can copy a list of the underlying list and then create the 
unmodifiable view of this list instead.

## How was this patch tested?
The exception might be difficult to test. Existing tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes #13211 from viirya/fix-concurrentmodify.

(cherry picked from commit 7920296bf8f313e010205937d3ebcbbc7b1a1d9e)
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/831c7c08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/831c7c08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/831c7c08

Branch: refs/heads/branch-2.0
Commit: 831c7c085d9714c488ebc6876e0a8404a52c0e37
Parents: 24ea165
Author: Liang-Chi Hsieh <[email protected]>
Authored: Sun May 22 08:08:46 2016 -0500
Committer: Sean Owen <[email protected]>
Committed: Sun May 22 08:08:59 2016 -0500

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/831c7c08/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 13cb6a2..21ba460 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
 
 import java.{lang => jl}
 import java.io.ObjectInputStream
+import java.util.ArrayList
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
@@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
 
 
 class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
-  private val _list: java.util.List[T] = new java.util.ArrayList[T]
+  private val _list: java.util.List[T] = new ArrayList[T]
 
   override def isZero: Boolean = _list.isEmpty
 
@@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def value: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
+  override def value: java.util.List[T] = _list.synchronized {
+    java.util.Collections.unmodifiableList(new ArrayList[T](_list))
+  }
 
   private[spark] def setValue(newValue: java.util.List[T]): Unit = {
     _list.clear()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to