Repository: accumulo
Updated Branches:
  refs/heads/master 315530fdc -> 956a50ecb


http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
 
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
new file mode 100644
index 0000000..269622a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+
+/**
+ * The Class BatchInputSplit. Encapsulates a set of Accumulo ranges on a 
single tablet for use in Map Reduce jobs.
+ * Can contain several Ranges per split.
+ */
+public class BatchInputSplit extends AccumuloInputSplit {
+  private Collection<Range> ranges;
+  private float[] rangeProgress = null;
+
+  public BatchInputSplit() {
+    ranges = Collections.emptyList();
+  }
+
+  public BatchInputSplit(BatchInputSplit split) throws IOException {
+    super(split);
+    this.setRanges(split.getRanges());
+  }
+
+  public BatchInputSplit(String table, String tableId, Collection<Range> 
ranges, String[] locations) {
+    super(table, tableId, locations);
+    this.ranges = ranges;
+  }
+
+  /**
+   * Save progress on each call to this function, implied by value of 
currentKey, and return average ranges in the split
+   */
+  public float getProgress(Key currentKey) {
+    if (null == rangeProgress)
+      rangeProgress = new float[ranges.size()];
+
+    float total = 0; // progress per range could be on different scales, this 
number is "fuzzy"
+
+    if (currentKey == null) {
+      for (float progress : rangeProgress)
+        total += progress;
+    } else {
+      int i = 0;
+      for (Range range : ranges) {
+        if (range.contains(currentKey)) {
+          // find the current range and report as if that is the single range
+          if (range.getStartKey() != null && range.getEndKey() != null) {
+            if (range.getStartKey().compareTo(range.getEndKey(), 
PartialKey.ROW) != 0) {
+              // just look at the row progress
+              rangeProgress[i] = getProgress(range.getStartKey().getRowData(), 
range.getEndKey().getRowData(), currentKey.getRowData());
+            } else if (range.getStartKey().compareTo(range.getEndKey(), 
PartialKey.ROW_COLFAM) != 0) {
+              // just look at the column family progress
+              rangeProgress[i] = 
getProgress(range.getStartKey().getColumnFamilyData(), 
range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData());
+            } else if (range.getStartKey().compareTo(range.getEndKey(), 
PartialKey.ROW_COLFAM_COLQUAL) != 0) {
+              // just look at the column qualifier progress
+              rangeProgress[i] = 
getProgress(range.getStartKey().getColumnQualifierData(), 
range.getEndKey().getColumnQualifierData(), 
currentKey.getColumnQualifierData());
+            }
+          }
+          total += rangeProgress[i];
+        }
+        i++;
+      }
+    }
+
+    return total / ranges.size();
+  }
+
+  /**
+   * This implementation of length is only an estimate, it does not provide 
exact values. Do not have your code rely on this return value.
+   */
+  @Override
+  public long getLength() throws IOException {
+    long sum = 0;
+    for (Range range : ranges)
+      sum += getRangeLength(range);
+    return sum;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+
+    int numRanges = in.readInt();
+    ranges = new ArrayList<Range>(numRanges);
+    for (int i = 0; i < numRanges; ++i){
+      Range r = new Range();
+      r.readFields(in);
+      ranges.add(r);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+
+    out.writeInt(ranges.size());
+    for (Range r: ranges)
+      r.write(out);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(256);
+    sb.append("BatchInputSplit:");
+    sb.append(" Ranges: ").append(Arrays.asList(ranges));
+    sb.append(" Location: ").append(Arrays.asList(locations));
+    sb.append(" Table: ").append(tableName);
+    sb.append(" TableID: ").append(tableId);
+    sb.append(" InstanceName: ").append(instanceName);
+    sb.append(" zooKeepers: ").append(zooKeepers);
+    sb.append(" principal: ").append(principal);
+    sb.append(" tokenSource: ").append(tokenSource);
+    sb.append(" authenticationToken: ").append(token);
+    sb.append(" authenticationTokenFile: ").append(tokenFile);
+    sb.append(" Authorizations: ").append(auths);
+    sb.append(" fetchColumns: ").append(fetchedColumns);
+    sb.append(" iterators: ").append(iterators);
+    sb.append(" logLevel: ").append(level);
+    return sb.toString();
+  }
+
+  public void setRanges(Collection<Range> ranges) {
+    this.ranges = ranges;
+  }
+
+  public Collection<Range> getRanges() {
+    return ranges;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
 
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
index b0360fa..6b8fe34 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
@@ -95,7 +95,7 @@ public class InputConfigurator extends ConfiguratorBase {
    * @since 1.6.0
    */
   public static enum Features {
-    AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE
+    AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE, 
BATCH_SCANNER, BATCH_SCANNER_THREADS
   }
 
   /**
@@ -517,6 +517,40 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
+   * Controls the use of the {@link BatchScanner} in this job.
+   * Using this feature will group ranges by their source tablet per 
InputSplit and use BatchScanner to read them.
+   *
+   * <p>
+   * By default, this feature is <b>disabled</b>.
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param enableFeature
+   *          the feature is enabled if true, disabled otherwise
+   * @since 1.7.0
+   */
+  public static void setBatchScan(Class<?> implementingClass, Configuration 
conf, boolean enableFeature) {
+    conf.setBoolean(enumToConfKey(implementingClass, Features.BATCH_SCANNER), 
enableFeature);
+  }
+
+  /**
+   * Determines whether a configuration has the BatchScanner feature enabled.
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property 
configuration key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return true if the feature is enabled, false otherwise
+   * @since 1.7.0
+   * @see #setBatchScan(Class, Configuration, boolean)
+   */
+  public static Boolean isBatchScan(Class<?> implementingClass, Configuration 
conf) {
+    return conf.getBoolean(enumToConfKey(implementingClass, 
Features.BATCH_SCANNER), false);
+  }
+
+  /**
    * Sets configurations for multiple tables at a time.
    *
    * @param implementingClass

http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 1927610..a14560c 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -232,8 +232,8 @@ public class AccumuloInputFormatTest {
     @Override
     public int run(String[] args) throws Exception {
 
-      if (args.length != 5) {
-        throw new IllegalArgumentException("Usage : " + 
MRTester.class.getName() + " <user> <pass> <table> <instanceName> 
<inputFormatClass>");
+      if (args.length != 5 && args.length != 6) {
+        throw new IllegalArgumentException("Usage : " + 
MRTester.class.getName() + " <user> <pass> <table> <instanceName> 
<inputFormatClass> [<batchScan>]");
       }
 
       String user = args[0];
@@ -242,6 +242,10 @@ public class AccumuloInputFormatTest {
 
       String instanceName = args[3];
       String inputFormatClassName = args[4];
+      Boolean batchScan = false;
+      if (args.length == 6)
+        batchScan = Boolean.parseBoolean(args[5]);
+
       @SuppressWarnings("unchecked")
       Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends 
InputFormat<?,?>>) Class.forName(inputFormatClassName);
 
@@ -253,6 +257,7 @@ public class AccumuloInputFormatTest {
       AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
       AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setMockInstance(job, instanceName);
+      AccumuloInputFormat.setBatchScan(job, batchScan);
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -295,6 +300,27 @@ public class AccumuloInputFormatTest {
   }
 
   @Test
+  public void testMapWithBatchScanner() throws Exception {
+    final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
+    final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
+
+    MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
+    Connector c = mockInstance.getConnector("root", new PasswordToken(""));
+    c.tableOperations().create(TEST_TABLE_2);
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_2, new 
BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", 
i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(0, MRTester.main(new String[] {"root", "", 
TEST_TABLE_2, INSTANCE_NAME, AccumuloInputFormat.class.getCanonicalName(), 
"True"}));
+    assertNull(e1);
+    assertNull(e2);
+  }
+
+  @Test
   public void testCorrectRangeInputSplits() throws Exception {
     Job job = Job.getInstance(new Configuration(), 
this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java
new file mode 100644
index 0000000..4f3caf0
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BatchInputSplitTest {
+
+  @Test
+  public void testSimpleWritable() throws IOException {
+    Range[] ranges = new Range[] {new Range(new Key("a"), new Key("b"))};
+    BatchInputSplit split = new BatchInputSplit("table", "1", 
Arrays.asList(ranges), new String[] {"localhost"});
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    split.write(dos);
+
+    BatchInputSplit newSplit = new BatchInputSplit();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    newSplit.readFields(dis);
+
+    Assert.assertEquals(split.getTableName(), newSplit.getTableName());
+    Assert.assertEquals(split.getTableId(), newSplit.getTableId());
+    Assert.assertEquals(split.getRanges(), newSplit.getRanges());
+    Assert.assertTrue(Arrays.equals(split.getLocations(), 
newSplit.getLocations()));
+  }
+
+  @Test
+  public void testAllFieldsWritable() throws IOException {
+    Range[] ranges = new Range[] {new Range(new Key("a"), new Key("b"))};
+    BatchInputSplit split = new BatchInputSplit("table", "1", 
Arrays.asList(ranges), new String[] {"localhost"});
+
+    Set<Pair<Text,Text>> fetchedColumns = new HashSet<Pair<Text,Text>>();
+
+    fetchedColumns.add(new Pair<Text,Text>(new Text("colf1"), new 
Text("colq1")));
+    fetchedColumns.add(new Pair<Text,Text>(new Text("colf2"), new 
Text("colq2")));
+
+    // Fake some iterators
+    ArrayList<IteratorSetting> iterators = new ArrayList<IteratorSetting>();
+    IteratorSetting setting = new IteratorSetting(50, SummingCombiner.class);
+    setting.addOption("foo", "bar");
+    iterators.add(setting);
+
+    setting = new IteratorSetting(100, WholeRowIterator.class);
+    setting.addOption("bar", "foo");
+    iterators.add(setting);
+
+    split.setTableName("table");
+    split.setAuths(new Authorizations("foo"));
+    split.setFetchedColumns(fetchedColumns);
+    split.setToken(new PasswordToken("password"));
+    split.setPrincipal("root");
+    split.setMockInstance(true);
+    split.setInstanceName("instance");
+    split.setZooKeepers("localhost");
+    split.setIterators(iterators);
+    split.setLogLevel(Level.WARN);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    split.write(dos);
+
+    BatchInputSplit newSplit = new BatchInputSplit();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    newSplit.readFields(dis);
+
+    Assert.assertEquals(split.getRanges(), newSplit.getRanges());
+    Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations());
+
+    Assert.assertEquals(split.getTableName(), newSplit.getTableName());
+    Assert.assertEquals(split.getAuths(), newSplit.getAuths());
+    Assert.assertEquals(split.getFetchedColumns(), 
newSplit.getFetchedColumns());
+    Assert.assertEquals(split.getToken(), newSplit.getToken());
+    Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal());
+    Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName());
+    Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance());
+    Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());
+    Assert.assertEquals(split.getIterators(), newSplit.getIterators());
+    Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
 
b/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
index 02a00f8..bc607c3 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
@@ -32,9 +32,9 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -75,7 +75,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterIT {
    * Tests several different paths through the getSplits() method by setting 
different properties and verifying the results.
    */
   @Test
-  public void testGetSplits() throws IOException, AccumuloSecurityException, 
AccumuloException, TableNotFoundException, TableExistsException {
+  public void testGetSplits() throws Exception {
     Connector conn = getConnector();
     String table = getUniqueNames(1)[0];
     conn.tableOperations().create(table);
@@ -128,7 +128,7 @@ public class AccumuloInputFormatIT extends 
AccumuloClusterIT {
     try {
       inputFormat.getSplits(job);
       fail("An exception should have been thrown");
-    } catch (Exception e) {}
+    } catch (IOException e) {}
 
     conn.tableOperations().offline(table);
     splits = inputFormat.getSplits(job);
@@ -146,6 +146,49 @@ public class AccumuloInputFormatIT extends 
AccumuloClusterIT {
     AccumuloInputFormat.setAutoAdjustRanges(job, false);
     splits = inputFormat.getSplits(job);
     assertEquals(ranges.size(), splits.size());
+
+    //BatchScan not available for offline scans
+    AccumuloInputFormat.setBatchScan(job, true);
+
+    AccumuloInputFormat.setOfflineTableScan(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IOException e) {}
+    AccumuloInputFormat.setOfflineTableScan(job, false);
+
+    // test for resumption of success
+    inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    //BatchScan not available with isolated iterators
+    AccumuloInputFormat.setScanIsolation(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IOException e) {}
+    AccumuloInputFormat.setScanIsolation(job, false);
+
+    // test for resumption of success
+    inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    //BatchScan not available with local iterators
+    AccumuloInputFormat.setLocalIterators(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IOException e) {}
+    AccumuloInputFormat.setLocalIterators(job, false);
+
+    //Check we are getting back correct type pf split
+    conn.tableOperations().online(table);
+    splits = inputFormat.getSplits(job);
+    for (InputSplit split: splits)
+      assert(split instanceof BatchInputSplit);
+
+    //We should divide along the tablet lines similar to when using 
`setAutoAdjustRanges(job, true)`
+    assertEquals(2, splits.size());
   }
 
   private void insertData(String tableName, long ts) throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException {

Reply via email to