Repository: accumulo Updated Branches: refs/heads/master 315530fdc -> 956a50ecb
http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java new file mode 100644 index 0000000..269622a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplit.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; + +/** + * The Class BatchInputSplit. Encapsulates a set of Accumulo ranges on a single tablet for use in Map Reduce jobs. + * Can contain several Ranges per split. + */ +public class BatchInputSplit extends AccumuloInputSplit { + private Collection<Range> ranges; + private float[] rangeProgress = null; + + public BatchInputSplit() { + ranges = Collections.emptyList(); + } + + public BatchInputSplit(BatchInputSplit split) throws IOException { + super(split); + this.setRanges(split.getRanges()); + } + + public BatchInputSplit(String table, String tableId, Collection<Range> ranges, String[] locations) { + super(table, tableId, locations); + this.ranges = ranges; + } + + /** + * Save progress on each call to this function, implied by value of currentKey, and return average ranges in the split + */ + public float getProgress(Key currentKey) { + if (null == rangeProgress) + rangeProgress = new float[ranges.size()]; + + float total = 0; // progress per range could be on different scales, this number is "fuzzy" + + if (currentKey == null) { + for (float progress : rangeProgress) + total += progress; + } else { + int i = 0; + for (Range range : ranges) { + if (range.contains(currentKey)) { + // find the current range and report as if that is the single range + if (range.getStartKey() != null && range.getEndKey() != null) { + if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) { + // just look at the row progress + rangeProgress[i] = getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(), currentKey.getRowData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM) != 0) { + // just look at the column family progress + rangeProgress[i] = getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(), currentKey.getColumnFamilyData()); + } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL) != 0) { + // just look at the column qualifier progress + rangeProgress[i] = getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(), currentKey.getColumnQualifierData()); + } + } + total += rangeProgress[i]; + } + i++; + } + } + + return total / ranges.size(); + } + + /** + * This implementation of length is only an estimate, it does not provide exact values. Do not have your code rely on this return value. + */ + @Override + public long getLength() throws IOException { + long sum = 0; + for (Range range : ranges) + sum += getRangeLength(range); + return sum; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + int numRanges = in.readInt(); + ranges = new ArrayList<Range>(numRanges); + for (int i = 0; i < numRanges; ++i){ + Range r = new Range(); + r.readFields(in); + ranges.add(r); + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + + out.writeInt(ranges.size()); + for (Range r: ranges) + r.write(out); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(256); + sb.append("BatchInputSplit:"); + sb.append(" Ranges: ").append(Arrays.asList(ranges)); + sb.append(" Location: ").append(Arrays.asList(locations)); + sb.append(" Table: ").append(tableName); + sb.append(" TableID: ").append(tableId); + sb.append(" InstanceName: ").append(instanceName); + sb.append(" zooKeepers: ").append(zooKeepers); + sb.append(" principal: ").append(principal); + sb.append(" tokenSource: ").append(tokenSource); + sb.append(" authenticationToken: ").append(token); + sb.append(" authenticationTokenFile: ").append(tokenFile); + sb.append(" Authorizations: ").append(auths); + sb.append(" fetchColumns: ").append(fetchedColumns); + sb.append(" iterators: ").append(iterators); + sb.append(" logLevel: ").append(level); + return sb.toString(); + } + + public void setRanges(Collection<Range> ranges) { + this.ranges = ranges; + } + + public Collection<Range> getRanges() { + return ranges; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java index b0360fa..6b8fe34 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java @@ -95,7 +95,7 @@ public class InputConfigurator extends ConfiguratorBase { * @since 1.6.0 */ public static enum Features { - AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE + AUTO_ADJUST_RANGES, SCAN_ISOLATION, USE_LOCAL_ITERATORS, SCAN_OFFLINE, BATCH_SCANNER, BATCH_SCANNER_THREADS } /** @@ -517,6 +517,40 @@ public class InputConfigurator extends ConfiguratorBase { } /** + * Controls the use of the {@link BatchScanner} in this job. + * Using this feature will group ranges by their source tablet per InputSplit and use BatchScanner to read them. + * + * <p> + * By default, this feature is <b>disabled</b>. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @param enableFeature + * the feature is enabled if true, disabled otherwise + * @since 1.7.0 + */ + public static void setBatchScan(Class<?> implementingClass, Configuration conf, boolean enableFeature) { + conf.setBoolean(enumToConfKey(implementingClass, Features.BATCH_SCANNER), enableFeature); + } + + /** + * Determines whether a configuration has the BatchScanner feature enabled. + * + * @param implementingClass + * the class whose name will be used as a prefix for the property configuration key + * @param conf + * the Hadoop configuration object to configure + * @return true if the feature is enabled, false otherwise + * @since 1.7.0 + * @see #setBatchScan(Class, Configuration, boolean) + */ + public static Boolean isBatchScan(Class<?> implementingClass, Configuration conf) { + return conf.getBoolean(enumToConfKey(implementingClass, Features.BATCH_SCANNER), false); + } + + /** * Sets configurations for multiple tables at a time. * * @param implementingClass http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 1927610..a14560c 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -232,8 +232,8 @@ public class AccumuloInputFormatTest { @Override public int run(String[] args) throws Exception { - if (args.length != 5) { - throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <instanceName> <inputFormatClass>"); + if (args.length != 5 && args.length != 6) { + throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table> <instanceName> <inputFormatClass> [<batchScan>]"); } String user = args[0]; @@ -242,6 +242,10 @@ public class AccumuloInputFormatTest { String instanceName = args[3]; String inputFormatClassName = args[4]; + Boolean batchScan = false; + if (args.length == 6) + batchScan = Boolean.parseBoolean(args[5]); + @SuppressWarnings("unchecked") Class<? extends InputFormat<?,?>> inputFormatClass = (Class<? extends InputFormat<?,?>>) Class.forName(inputFormatClassName); @@ -253,6 +257,7 @@ public class AccumuloInputFormatTest { AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setMockInstance(job, instanceName); + AccumuloInputFormat.setBatchScan(job, batchScan); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); @@ -295,6 +300,27 @@ public class AccumuloInputFormatTest { } @Test + public void testMapWithBatchScanner() throws Exception { + final String INSTANCE_NAME = PREFIX + "_mapreduce_instance"; + final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2"; + + MockInstance mockInstance = new MockInstance(INSTANCE_NAME); + Connector c = mockInstance.getConnector("root", new PasswordToken("")); + c.tableOperations().create(TEST_TABLE_2); + BatchWriter bw = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); + m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); + bw.addMutation(m); + } + bw.close(); + + Assert.assertEquals(0, MRTester.main(new String[] {"root", "", TEST_TABLE_2, INSTANCE_NAME, AccumuloInputFormat.class.getCanonicalName(), "True"})); + assertNull(e1); + assertNull(e2); + } + + @Test public void testCorrectRangeInputSplits() throws Exception { Job job = Job.getInstance(new Configuration(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java new file mode 100644 index 0000000..4f3caf0 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/impl/BatchInputSplitTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +public class BatchInputSplitTest { + + @Test + public void testSimpleWritable() throws IOException { + Range[] ranges = new Range[] {new Range(new Key("a"), new Key("b"))}; + BatchInputSplit split = new BatchInputSplit("table", "1", Arrays.asList(ranges), new String[] {"localhost"}); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + split.write(dos); + + BatchInputSplit newSplit = new BatchInputSplit(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + newSplit.readFields(dis); + + Assert.assertEquals(split.getTableName(), newSplit.getTableName()); + Assert.assertEquals(split.getTableId(), newSplit.getTableId()); + Assert.assertEquals(split.getRanges(), newSplit.getRanges()); + Assert.assertTrue(Arrays.equals(split.getLocations(), newSplit.getLocations())); + } + + @Test + public void testAllFieldsWritable() throws IOException { + Range[] ranges = new Range[] {new Range(new Key("a"), new Key("b"))}; + BatchInputSplit split = new BatchInputSplit("table", "1", Arrays.asList(ranges), new String[] {"localhost"}); + + Set<Pair<Text,Text>> fetchedColumns = new HashSet<Pair<Text,Text>>(); + + fetchedColumns.add(new Pair<Text,Text>(new Text("colf1"), new Text("colq1"))); + fetchedColumns.add(new Pair<Text,Text>(new Text("colf2"), new Text("colq2"))); + + // Fake some iterators + ArrayList<IteratorSetting> iterators = new ArrayList<IteratorSetting>(); + IteratorSetting setting = new IteratorSetting(50, SummingCombiner.class); + setting.addOption("foo", "bar"); + iterators.add(setting); + + setting = new IteratorSetting(100, WholeRowIterator.class); + setting.addOption("bar", "foo"); + iterators.add(setting); + + split.setTableName("table"); + split.setAuths(new Authorizations("foo")); + split.setFetchedColumns(fetchedColumns); + split.setToken(new PasswordToken("password")); + split.setPrincipal("root"); + split.setMockInstance(true); + split.setInstanceName("instance"); + split.setZooKeepers("localhost"); + split.setIterators(iterators); + split.setLogLevel(Level.WARN); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + split.write(dos); + + BatchInputSplit newSplit = new BatchInputSplit(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + newSplit.readFields(dis); + + Assert.assertEquals(split.getRanges(), newSplit.getRanges()); + Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations()); + + Assert.assertEquals(split.getTableName(), newSplit.getTableName()); + Assert.assertEquals(split.getAuths(), newSplit.getAuths()); + Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns()); + Assert.assertEquals(split.getToken(), newSplit.getToken()); + Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal()); + Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName()); + Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance()); + Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers()); + Assert.assertEquals(split.getIterators(), newSplit.getIterators()); + Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/956a50ec/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java b/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java index 02a00f8..bc607c3 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java @@ -32,9 +32,9 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; @@ -75,7 +75,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterIT { * Tests several different paths through the getSplits() method by setting different properties and verifying the results. */ @Test - public void testGetSplits() throws IOException, AccumuloSecurityException, AccumuloException, TableNotFoundException, TableExistsException { + public void testGetSplits() throws Exception { Connector conn = getConnector(); String table = getUniqueNames(1)[0]; conn.tableOperations().create(table); @@ -128,7 +128,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterIT { try { inputFormat.getSplits(job); fail("An exception should have been thrown"); - } catch (Exception e) {} + } catch (IOException e) {} conn.tableOperations().offline(table); splits = inputFormat.getSplits(job); @@ -146,6 +146,49 @@ public class AccumuloInputFormatIT extends AccumuloClusterIT { AccumuloInputFormat.setAutoAdjustRanges(job, false); splits = inputFormat.getSplits(job); assertEquals(ranges.size(), splits.size()); + + //BatchScan not available for offline scans + AccumuloInputFormat.setBatchScan(job, true); + + AccumuloInputFormat.setOfflineTableScan(job, true); + try { + inputFormat.getSplits(job); + fail("An exception should have been thrown"); + } catch (IOException e) {} + AccumuloInputFormat.setOfflineTableScan(job, false); + + // test for resumption of success + inputFormat.getSplits(job); + assertEquals(2, splits.size()); + + //BatchScan not available with isolated iterators + AccumuloInputFormat.setScanIsolation(job, true); + try { + inputFormat.getSplits(job); + fail("An exception should have been thrown"); + } catch (IOException e) {} + AccumuloInputFormat.setScanIsolation(job, false); + + // test for resumption of success + inputFormat.getSplits(job); + assertEquals(2, splits.size()); + + //BatchScan not available with local iterators + AccumuloInputFormat.setLocalIterators(job, true); + try { + inputFormat.getSplits(job); + fail("An exception should have been thrown"); + } catch (IOException e) {} + AccumuloInputFormat.setLocalIterators(job, false); + + //Check we are getting back correct type pf split + conn.tableOperations().online(table); + splits = inputFormat.getSplits(job); + for (InputSplit split: splits) + assert(split instanceof BatchInputSplit); + + //We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, true)` + assertEquals(2, splits.size()); } private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {