Author: afuchs
Date: Fri Apr 13 19:14:48 2012
New Revision: 1325909

URL: http://svn.apache.org/viewvc?rev=1325909&view=rev
Log:
ACCUMULO-533

Added:
    
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
   (with props)
Modified:
    
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java

Modified: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java?rev=1325909&r1=1325908&r2=1325909&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
 (original)
+++ 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
 Fri Apr 13 19:14:48 2012
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.iterators.conf.PerColumnIteratorConfig;
+import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 import org.apache.hadoop.io.Writable;
@@ -215,7 +216,8 @@ public class IteratorUtil {
   @SuppressWarnings("unchecked")
   public static <K extends WritableComparable<?>,V extends Writable> 
SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
       Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, 
IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
-    SortedKeyValueIterator<K,V> prev = source;
+    // wrap the source in a SynchronizedIterator in case any of the additional 
configured iterators want to use threading
+    SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<K,V>(source);
     
     try {
       for (IterInfo iterInfo : iters) {

Added: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java?rev=1325909&view=auto
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
 (added)
+++ 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
 Fri Apr 13 19:14:48 2012
@@ -0,0 +1,63 @@
+package org.apache.accumulo.core.iterators.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/***
+ * SynchronizedIterator: wrap a SortedKeyValueIterator so that all of its 
methods are synchronized
+ */
+public class SynchronizedIterator <K extends WritableComparable<?>,V extends 
Writable> implements SortedKeyValueIterator<K,V> {
+
+  private SortedKeyValueIterator<K,V> source = null;
+  
+  @Override
+  public synchronized void init(SortedKeyValueIterator<K,V> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+    this.source = source;
+    source.init(source, options, env);
+  }
+
+  @Override
+  public synchronized boolean hasTop() {
+    return source.hasTop();
+  }
+
+  @Override
+  public synchronized void next() throws IOException {
+    source.next();
+  }
+
+  @Override
+  public synchronized void seek(Range range, Collection<ByteSequence> 
columnFamilies, boolean inclusive) throws IOException {
+    source.seek(range, columnFamilies, inclusive);
+  }
+
+  @Override
+  public synchronized K getTopKey() {
+    return source.getTopKey();
+  }
+
+  @Override
+  public synchronized V getTopValue() {
+    return source.getTopValue();
+  }
+
+  @Override
+  public synchronized SortedKeyValueIterator<K,V> deepCopy(IteratorEnvironment 
env) {
+    return new SynchronizedIterator<K,V>(source.deepCopy(env));
+  }
+  
+  public SynchronizedIterator(){}
+  
+  public SynchronizedIterator(SortedKeyValueIterator<K,V> source)
+  {
+    this.source = source;
+  }
+}

Propchange: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to