http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index 7b6284d..1963532 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@ -69,15 +69,15 @@ import com.google.common.net.HostAndPort; public class ThriftScanner { private static final Logger log = Logger.getLogger(ThriftScanner.class); - + public static final Map<TabletType,Set<String>> serversWaitedForWrites = new EnumMap<TabletType,Set<String>>(TabletType.class); - + static { for (TabletType ttype : TabletType.values()) { serversWaitedForWrites.put(ttype, Collections.synchronizedSet(new HashSet<String>())); } } - + public static boolean getBatchFromServer(ClientContext context, Range range, KeyExtent extent, String server, SortedMap<Key,Value> results, SortedSet<Column> fetchedColumns, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, int size, Authorizations authorizations, boolean retry) throws AccumuloException, AccumuloSecurityException, NotServingTabletException { @@ -92,7 +92,7 @@ public class ThriftScanner { // not reading whole rows (or stopping on row boundries) so there is no need to enable isolation below ScanState scanState = new ScanState(context, extent.getTableId(), authorizations, range, fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, false); - + TabletType ttype = TabletType.type(extent); boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server); InitialScan isr = client.startScan(tinfo, scanState.context.rpcCreds(), extent.toThrift(), scanState.range.toThrift(), @@ -100,14 +100,14 @@ public class ThriftScanner { scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold); if (waitForWrites) serversWaitedForWrites.get(ttype).add(server); - + Key.decompress(isr.result.results); - + for (TKeyValue kv : isr.result.results) results.put(new Key(kv.key), new Value(kv.value)); - + client.closeScan(tinfo, isr.scanID); - + return isr.result.more; } finally { ThriftUtil.returnClient(client); @@ -122,33 +122,33 @@ public class ThriftScanner { } catch (TException e) { log.debug("Error getting transport to " + server + " : " + e); } - + throw new AccumuloException("getBatchFromServer: failed"); } - + public static class ScanState { - + boolean isolated; Text tableId; Text startRow; boolean skipStartRow; long readaheadThreshold; - + Range range; - + int size; - + ClientContext context; Authorizations authorizations; List<Column> columns; - + TabletLocation prevLoc; Long scanID; - + boolean finished = false; - + List<IterInfo> serverSideIteratorList; - + Map<String,Map<String,String>> serverSideIteratorOptions; public ScanState(ClientContext context, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size, @@ -162,40 +162,40 @@ public class ThriftScanner { this.context = context; ; this.authorizations = authorizations; - + columns = new ArrayList<Column>(fetchedColumns.size()); for (Column column : fetchedColumns) { columns.add(column); } - + this.tableId = tableId; this.range = range; - + Key startKey = range.getStartKey(); if (startKey == null) { startKey = new Key(); } this.startRow = startKey.getRow(); - + this.skipStartRow = false; - + this.size = size; - + this.serverSideIteratorList = serverSideIteratorList; this.serverSideIteratorOptions = serverSideIteratorOptions; - + this.isolated = isolated; this.readaheadThreshold = readaheadThreshold; - + } } - + public static class ScanTimedOutException extends IOException { - + private static final long serialVersionUID = 1L; - + } - + public static List<KeyValue> scan(ClientContext context, ScanState scanState, int timeOut) throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, TableNotFoundException { TabletLocation loc = null; @@ -204,34 +204,34 @@ public class ThriftScanner { String lastError = null; String error = null; int tooManyFilesCount = 0; - + List<KeyValue> results = null; - + Span span = Trace.start("scan"); try { while (results == null && !scanState.finished) { if (Thread.currentThread().isInterrupted()) { throw new AccumuloException("Thread interrupted"); } - + if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut) throw new ScanTimedOutException(); - + while (loc == null) { long currentTime = System.currentTimeMillis(); if ((currentTime - startTime) / 1000.0 > timeOut) throw new ScanTimedOutException(); - + Span locateSpan = Trace.start("scan:locateTablet"); try { loc = TabletLocator.getLocator(context, scanState.tableId).locateTablet(context, scanState.startRow, scanState.skipStartRow, false); - + if (loc == null) { if (!Tables.exists(instance, scanState.tableId.toString())) throw new TableDeletedException(scanState.tableId.toString()); else if (Tables.getTableState(instance, scanState.tableId.toString()) == TableState.OFFLINE) throw new TableOfflineException(instance, scanState.tableId.toString()); - + error = "Failed to locate tablet for table : " + scanState.tableId + " row : " + scanState.startRow; if (!error.equals(lastError)) log.debug(error); @@ -243,7 +243,7 @@ public class ThriftScanner { // when a tablet splits we do want to continue scanning the low child // of the split if we are already passed it Range dataRange = loc.tablet_extent.toDataRange(); - + if (scanState.range.getStartKey() != null && dataRange.afterEndKey(scanState.range.getStartKey())) { // go to the next tablet scanState.startRow = loc.tablet_extent.getEndRow(); @@ -264,14 +264,14 @@ public class ThriftScanner { log.debug(error); else if (log.isTraceEnabled()) log.trace(error); - + lastError = error; Thread.sleep(100); } finally { locateSpan.stop(); } } - + Span scanLocation = Trace.start("scan:location"); scanLocation.data("tserver", loc.tablet_location); try { @@ -291,16 +291,16 @@ public class ThriftScanner { else if (log.isTraceEnabled()) log.trace(error); lastError = error; - + TabletLocator.getLocator(context, scanState.tableId).invalidateCache(loc.tablet_extent); loc = null; - + // no need to try the current scan id somewhere else scanState.scanID = null; - + if (scanState.isolated) throw new IsolationException(); - + Thread.sleep(100); } catch (NoSuchScanIDException e) { error = "Scan failed, no such scan id " + scanState.scanID + " " + loc; @@ -309,10 +309,10 @@ public class ThriftScanner { else if (log.isTraceEnabled()) log.trace(error); lastError = error; - + if (scanState.isolated) throw new IsolationException(); - + scanState.scanID = null; } catch (TooManyFilesException e) { error = "Tablet has too many files " + loc + " retrying..."; @@ -327,15 +327,15 @@ public class ThriftScanner { log.trace(error); } lastError = error; - + // not sure what state the scan session on the server side is // in after this occurs, so lets be cautious and start a new // scan session scanState.scanID = null; - + if (scanState.isolated) throw new IsolationException(); - + Thread.sleep(100); } catch (TException e) { TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context.getInstance(), loc.tablet_location); @@ -346,24 +346,24 @@ public class ThriftScanner { log.trace(error); lastError = error; loc = null; - + // do not want to continue using the same scan id, if a timeout occurred could cause a batch to be skipped // because a thread on the server side may still be processing the timed out continue scan scanState.scanID = null; - + if (scanState.isolated) throw new IsolationException(); - + Thread.sleep(100); } finally { scanLocation.stop(); } } - + if (results != null && results.size() == 0 && scanState.finished) { results = null; } - + return results; } catch (InterruptedException ex) { throw new AccumuloException(ex); @@ -371,12 +371,12 @@ public class ThriftScanner { span.stop(); } } - + private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, ClientContext context) throws AccumuloSecurityException, NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException { if (scanState.finished) return null; - + OpTimer opTimer = new OpTimer(log, Level.TRACE); final TInfo tinfo = Tracer.traceInfo(); @@ -386,18 +386,18 @@ public class ThriftScanner { String old = Thread.currentThread().getName(); try { ScanResult sr; - + if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc)) scanState.scanID = null; - + scanState.prevLoc = loc; - + if (scanState.scanID == null) { String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent + " range=" + scanState.range + " ssil=" + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions; Thread.currentThread().setName(msg); opTimer.start(msg); - + TabletType ttype = TabletType.type(loc.tablet_extent); boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location); InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(), loc.tablet_extent.toThrift(), scanState.range.toThrift(), @@ -405,27 +405,27 @@ public class ThriftScanner { scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold); if (waitForWrites) serversWaitedForWrites.get(ttype).add(loc.tablet_location); - + sr = is.result; - + if (sr.more) scanState.scanID = is.scanID; else client.closeScan(tinfo, is.scanID); - + } else { // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc); String msg = "Continuing scan tserver=" + loc.tablet_location + " scanid=" + scanState.scanID; Thread.currentThread().setName(msg); opTimer.start(msg); - + sr = client.continueScan(tinfo, scanState.scanID); if (!sr.more) { client.closeScan(tinfo, scanState.scanID); scanState.scanID = null; } } - + if (!sr.more) { // log.debug("No more : tab end row = "+loc.tablet_extent.getEndRow()+" range = "+scanState.range); if (loc.tablet_extent.getEndRow() == null) { @@ -442,18 +442,18 @@ public class ThriftScanner { } else { opTimer.stop("Finished scan in %DURATION% #results=" + sr.results.size() + " scanid=" + scanState.scanID); } - + Key.decompress(sr.results); - + if (sr.results.size() > 0 && !scanState.finished) scanState.range = new Range(new Key(sr.results.get(sr.results.size() - 1).key), false, scanState.range.getEndKey(), scanState.range.isEndKeyInclusive()); - + List<KeyValue> results = new ArrayList<KeyValue>(sr.results.size()); for (TKeyValue tkv : sr.results) results.add(new KeyValue(new Key(tkv.key), tkv.value)); - + return results; - + } catch (ThriftSecurityException e) { throw new AccumuloSecurityException(e.user, e.code, e); } finally {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java index 644ba31..8a6777d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java @@ -31,14 +31,14 @@ import org.apache.accumulo.core.data.Range; import org.apache.hadoop.io.Text; /** - * + * */ public class TimeoutTabletLocator extends TabletLocator { - + private TabletLocator locator; private long timeout; private Long firstFailTime = null; - + private void failed() { if (firstFailTime == null) { firstFailTime = System.currentTimeMillis(); @@ -46,93 +46,93 @@ public class TimeoutTabletLocator extends TabletLocator { throw new TimedOutException("Failed to obtain metadata"); } } - + private void succeeded() { firstFailTime = null; } - + public TimeoutTabletLocator(TabletLocator locator, long timeout) { this.locator = locator; this.timeout = timeout; } - + @Override public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + try { TabletLocation ret = locator.locateTablet(context, row, skipRow, retry); - + if (ret == null) failed(); else succeeded(); - + return ret; } catch (AccumuloException ae) { failed(); throw ae; } } - + @Override public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { try { locator.binMutations(context, mutations, binnedMutations, failures); - + if (failures.size() == mutations.size()) failed(); else succeeded(); - + } catch (AccumuloException ae) { failed(); throw ae; } } - + /** - * + * */ - + @Override public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + try { List<Range> ret = locator.binRanges(context, ranges, binnedRanges); - + if (ranges.size() == ret.size()) failed(); else succeeded(); - + return ret; } catch (AccumuloException ae) { failed(); throw ae; } } - + @Override public void invalidateCache(KeyExtent failedExtent) { locator.invalidateCache(failedExtent); } - + @Override public void invalidateCache(Collection<KeyExtent> keySet) { locator.invalidateCache(keySet); } - + @Override public void invalidateCache() { locator.invalidateCache(); } - + @Override public void invalidateCache(Instance instance, String server) { locator.invalidateCache(instance, server); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java index 3ce542d..2503d95 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java @@ -33,31 +33,31 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.data.thrift.TRange; public abstract class Translator<IT,OT> { - + public abstract OT translate(IT input); - + public static class TKeyExtentTranslator extends Translator<TKeyExtent,KeyExtent> { @Override public KeyExtent translate(TKeyExtent input) { return new KeyExtent(input); } - + } - + public static class KeyExtentTranslator extends Translator<KeyExtent,TKeyExtent> { @Override public TKeyExtent translate(KeyExtent input) { return input.toThrift(); } } - + public static class TCVSTranslator extends Translator<TConstraintViolationSummary,ConstraintViolationSummary> { @Override public ConstraintViolationSummary translate(TConstraintViolationSummary input) { return new ConstraintViolationSummary(input); } } - + public static class CVSTranslator extends Translator<ConstraintViolationSummary,TConstraintViolationSummary> { @Override public TConstraintViolationSummary translate(ConstraintViolationSummary input) { @@ -71,69 +71,69 @@ public abstract class Translator<IT,OT> { return new Column(input); } } - + public static class ColumnTranslator extends Translator<Column,TColumn> { @Override public TColumn translate(Column input) { return input.toThrift(); } } - + public static class TRangeTranslator extends Translator<TRange,Range> { - + @Override public Range translate(TRange input) { return new Range(input); } - + } - + public static class RangeTranslator extends Translator<Range,TRange> { @Override public TRange translate(Range input) { return input.toThrift(); } } - + public static class ListTranslator<IT,OT> extends Translator<List<IT>,List<OT>> { - + private Translator<IT,OT> translator; - + public ListTranslator(Translator<IT,OT> translator) { this.translator = translator; } - + @Override public List<OT> translate(List<IT> input) { return translate(input, this.translator); } - + } - + public static <IKT,OKT,T> Map<OKT,T> translate(Map<IKT,T> input, Translator<IKT,OKT> keyTranslator) { HashMap<OKT,T> output = new HashMap<OKT,T>(); - + for (Entry<IKT,T> entry : input.entrySet()) output.put(keyTranslator.translate(entry.getKey()), entry.getValue()); - + return output; } - + public static <IKT,OKT,IVT,OVT> Map<OKT,OVT> translate(Map<IKT,IVT> input, Translator<IKT,OKT> keyTranslator, Translator<IVT,OVT> valueTranslator) { HashMap<OKT,OVT> output = new HashMap<OKT,OVT>(); - + for (Entry<IKT,IVT> entry : input.entrySet()) output.put(keyTranslator.translate(entry.getKey()), valueTranslator.translate(entry.getValue())); - + return output; } - + public static <IT,OT> List<OT> translate(Collection<IT> input, Translator<IT,OT> translator) { ArrayList<OT> output = new ArrayList<OT>(input.size()); - + for (IT in : input) output.add(translator.translate(in)); - + return output; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java index 552ddae..4f325be 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java @@ -41,19 +41,19 @@ import org.apache.thrift.TServiceClient; import com.google.common.net.HostAndPort; public class Writer { - + private static final Logger log = Logger.getLogger(Writer.class); - + private ClientContext context; private Text table; - + public Writer(ClientContext context, Text table) { checkArgument(context != null, "context is null"); checkArgument(table != null, "table is null"); this.context = context; this.table = table; } - + public Writer(ClientContext context, String table) { this(context, new Text(table)); } @@ -64,7 +64,7 @@ public class Writer { checkArgument(extent != null, "extent is null"); checkArgument(server != null, "server is null"); checkArgument(context != null, "context is null"); - + TabletClientService.Iface client = null; try { client = ThriftUtil.getTServerClient(server, context); @@ -76,16 +76,16 @@ public class Writer { ThriftUtil.returnClient((TServiceClient) client); } } - + public void update(Mutation m) throws AccumuloException, AccumuloSecurityException, ConstraintViolationException, TableNotFoundException { checkArgument(m != null, "m is null"); - + if (m.size() == 0) throw new IllegalArgumentException("Can not add empty mutations"); - + while (true) { TabletLocation tabLoc = TabletLocator.getLocator(context, table).locateTablet(context, new Text(m.getRow()), false, true); - + if (tabLoc == null) { log.trace("No tablet location found for row " + new String(m.getRow(), UTF_8)); UtilWaitThread.sleep(500); @@ -108,9 +108,9 @@ public class Writer { log.error("error sending update to " + parsedLocation + ": " + e); TabletLocator.getLocator(context, table).invalidateCache(tabLoc.tablet_extent); } - + UtilWaitThread.sleep(500); } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java index be56ad4..a9c33b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ZookeeperLockChecker.java @@ -26,21 +26,22 @@ import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.zookeeper.KeeperException; /** - * + * */ public class ZookeeperLockChecker implements TabletServerLockChecker { - + private final ZooCache zc; private final String root; ZookeeperLockChecker(Instance instance) { this(instance, new ZooCacheFactory()); } + ZookeeperLockChecker(Instance instance, ZooCacheFactory zcf) { zc = zcf.getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); this.root = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; } - + @Override public boolean isLockHeld(String tserver, String session) { try { @@ -51,10 +52,10 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { throw new RuntimeException(e); } } - + @Override public void invalidateCache(String tserver) { zc.clear(root + "/" + tserver); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BigIntegerLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BigIntegerLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BigIntegerLexicoder.java index 12bcdd2..838e3cd 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BigIntegerLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BigIntegerLexicoder.java @@ -27,58 +27,59 @@ import org.apache.accumulo.core.iterators.ValueFormatException; /** * A lexicoder to encode/decode a BigInteger to/from bytes that maintain its native Java sort order. + * * @since 1.6.0 */ public class BigIntegerLexicoder implements Lexicoder<BigInteger> { - + @Override public byte[] encode(BigInteger v) { - + try { byte[] bytes = v.toByteArray(); - + byte[] ret = new byte[4 + bytes.length]; - + DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret)); - + // flip the sign bit bytes[0] = (byte) (0x80 ^ bytes[0]); - + int len = bytes.length; if (v.signum() < 0) len = -len; - + len = len ^ 0x80000000; - + dos.writeInt(len); dos.write(bytes); dos.close(); - + return ret; } catch (IOException ioe) { throw new RuntimeException(ioe); } - + } - + @Override public BigInteger decode(byte[] b) throws ValueFormatException { - + try { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); int len = dis.readInt(); len = len ^ 0x80000000; len = Math.abs(len); - + byte[] bytes = new byte[len]; dis.readFully(bytes); - + bytes[0] = (byte) (0x80 ^ bytes[0]); - + return new BigInteger(bytes); } catch (IOException ioe) { throw new RuntimeException(ioe); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BytesLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BytesLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BytesLexicoder.java index b6fbab5..e018e0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BytesLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/BytesLexicoder.java @@ -19,19 +19,19 @@ package org.apache.accumulo.core.client.lexicoder; /** * For each of the methods, this lexicoder just passes the input through untouched. It is meant to be combined with other lexicoders like the * {@link ReverseLexicoder}. - * + * * @since 1.6.0 */ public class BytesLexicoder implements Lexicoder<byte[]> { - + @Override public byte[] encode(byte[] data) { return data; } - + @Override public byte[] decode(byte[] data) { return data; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/DoubleLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/DoubleLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/DoubleLexicoder.java index 5a25ede..6310645 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/DoubleLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/DoubleLexicoder.java @@ -22,9 +22,9 @@ package org.apache.accumulo.core.client.lexicoder; * @since 1.6.0 */ public class DoubleLexicoder implements Lexicoder<Double> { - + private ULongLexicoder longEncoder = new ULongLexicoder(); - + @Override public byte[] encode(Double d) { long l = Double.doubleToRawLongBits(d); @@ -32,10 +32,10 @@ public class DoubleLexicoder implements Lexicoder<Double> { l = ~l; else l = l ^ 0x8000000000000000l; - + return longEncoder.encode(l); } - + @Override public Double decode(byte[] data) { long l = longEncoder.decode(data); @@ -45,5 +45,5 @@ public class DoubleLexicoder implements Lexicoder<Double> { l = ~l; return Double.longBitsToDouble(l); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Encoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Encoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Encoder.java index 313ac6d..f1249ca 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Encoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Encoder.java @@ -18,8 +18,9 @@ package org.apache.accumulo.core.client.lexicoder; /** * An encoder represents a typed object that can be encoded/decoded to/from a byte array. + * * @since 1.6.0 */ public interface Encoder<T> extends org.apache.accumulo.core.iterators.TypedValueCombiner.Encoder<T> { - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/IntegerLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/IntegerLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/IntegerLexicoder.java index 8226421..12b515a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/IntegerLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/IntegerLexicoder.java @@ -17,22 +17,23 @@ package org.apache.accumulo.core.client.lexicoder; /** - * A lexicoder for signed integers. The encoding sorts Integer.MIN_VALUE first and Integer.MAX_VALUE last. The encoding sorts -2 before -1. It - * corresponds to the sort order of Integer. + * A lexicoder for signed integers. The encoding sorts Integer.MIN_VALUE first and Integer.MAX_VALUE last. The encoding sorts -2 before -1. It corresponds to + * the sort order of Integer. + * * @since 1.6.0 */ public class IntegerLexicoder implements Lexicoder<Integer> { - + private UIntegerLexicoder uil = new UIntegerLexicoder(); - + @Override public byte[] encode(Integer i) { return uil.encode(i ^ 0x80000000); } - + @Override public Integer decode(byte[] data) { return uil.decode(data) ^ 0x80000000; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Lexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Lexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Lexicoder.java index 2c7d4ae..b2ef8d2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Lexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/Lexicoder.java @@ -19,9 +19,9 @@ package org.apache.accumulo.core.client.lexicoder; /** * A lexicoder provides methods to convert to/from byte arrays. The byte arrays are constructed so that their sort order corresponds their parameterized class's * native Java sort order. - * + * * @since 1.6.0 */ public interface Lexicoder<T> extends Encoder<T> { - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ListLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ListLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ListLexicoder.java index 06eb962..5ecee88 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ListLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ListLexicoder.java @@ -26,42 +26,42 @@ import java.util.List; /** * A lexicoder to encode/decode a Java List to/from a byte array where the concatenation of each encoded element sorts lexicographically. - * + * * @since 1.6.0 */ public class ListLexicoder<LT> implements Lexicoder<List<LT>> { - + private Lexicoder<LT> lexicoder; - + public ListLexicoder(Lexicoder<LT> lexicoder) { this.lexicoder = lexicoder; } - + /** * @return a byte array containing the concatenation of each element in the list encoded. */ @Override public byte[] encode(List<LT> v) { byte[][] encElements = new byte[v.size()][]; - + int index = 0; for (LT element : v) { encElements[index++] = escape(lexicoder.encode(element)); } - + return concat(encElements); } - + @Override public List<LT> decode(byte[] b) { - + byte[][] escapedElements = split(b); ArrayList<LT> ret = new ArrayList<LT>(escapedElements.length); - + for (byte[] escapedElement : escapedElements) { ret.add(lexicoder.decode(unescape(escapedElement))); } - + return ret; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/LongLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/LongLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/LongLexicoder.java index 9106e26..f70a83c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/LongLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/LongLexicoder.java @@ -19,7 +19,7 @@ package org.apache.accumulo.core.client.lexicoder; /** * Signed long lexicoder. The encoding sorts Long.MIN_VALUE first and Long.MAX_VALUE last. The encoding sorts -2l before -1l. It corresponds to the native Java * sort order of Long. - * + * * @since 1.6.0 */ public class LongLexicoder extends ULongLexicoder { @@ -27,7 +27,7 @@ public class LongLexicoder extends ULongLexicoder { public byte[] encode(Long l) { return super.encode(l ^ 0x8000000000000000l); } - + @Override public Long decode(byte[] data) { return super.decode(data) ^ 0x8000000000000000l; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/PairLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/PairLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/PairLexicoder.java index e185630..22af289 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/PairLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/PairLexicoder.java @@ -24,53 +24,54 @@ import static org.apache.accumulo.core.client.lexicoder.impl.ByteUtils.unescape; import org.apache.accumulo.core.util.ComparablePair; /** - * This class is a lexicoder that sorts a ComparablePair. Each item in the pair is encoded with the given lexicoder and concatenated together. - * This makes it easy to construct a sortable key based on two components. There are many examples of this- but a key/value relationship is a great one. + * This class is a lexicoder that sorts a ComparablePair. Each item in the pair is encoded with the given lexicoder and concatenated together. This makes it + * easy to construct a sortable key based on two components. There are many examples of this- but a key/value relationship is a great one. * * If we decided we wanted a two-component key where the first component is a string and the second component a date which is reverse sorted, we can do so with * the following example: * * <pre> - * {@code - * StringLexicoder stringEncoder = new StringLexicoder(); - * ReverseLexicoder<Date> dateEncoder = new ReverseLexicoder<Date>(new DateLexicoder()); - * PairLexicoder<String,Date> pairLexicoder = new PairLexicoder<String,Date>(stringEncoder, dateEncoder); - * byte[] pair1 = pairLexicoder.encode(new ComparablePair<String,Date>("com.google", new Date())); - * byte[] pair2 = pairLexicoder.encode(new ComparablePair<String,Date>("com.google", new Date(System.currentTimeMillis() + 500))); - * byte[] pair3 = pairLexicoder.encode(new ComparablePair<String,Date>("org.apache", new Date(System.currentTimeMillis() + 1000))); + * { + * @code + * StringLexicoder stringEncoder = new StringLexicoder(); + * ReverseLexicoder<Date> dateEncoder = new ReverseLexicoder<Date>(new DateLexicoder()); + * PairLexicoder<String,Date> pairLexicoder = new PairLexicoder<String,Date>(stringEncoder, dateEncoder); + * byte[] pair1 = pairLexicoder.encode(new ComparablePair<String,Date>("com.google", new Date())); + * byte[] pair2 = pairLexicoder.encode(new ComparablePair<String,Date>("com.google", new Date(System.currentTimeMillis() + 500))); + * byte[] pair3 = pairLexicoder.encode(new ComparablePair<String,Date>("org.apache", new Date(System.currentTimeMillis() + 1000))); * } * </pre> - * + * * In the example, pair2 will be sorted before pair1. pair3 will occur last since 'org' is sorted after 'com'. If we just used a {@link DateLexicoder} instead * of a {@link ReverseLexicoder}, pair1 would have been sorted before pair2. - * + * * @since 1.6.0 */ public class PairLexicoder<A extends Comparable<A>,B extends Comparable<B>> implements Lexicoder<ComparablePair<A,B>> { - + private Lexicoder<A> firstLexicoder; private Lexicoder<B> secondLexicoder; - + public PairLexicoder(Lexicoder<A> firstLexicoder, Lexicoder<B> secondLexicoder) { this.firstLexicoder = firstLexicoder; this.secondLexicoder = secondLexicoder; } - + @Override public byte[] encode(ComparablePair<A,B> data) { return concat(escape(firstLexicoder.encode(data.getFirst())), escape(secondLexicoder.encode(data.getSecond()))); } - + @Override public ComparablePair<A,B> decode(byte[] data) { - + byte[][] fields = split(data); if (fields.length != 2) { throw new RuntimeException("Data does not have 2 fields, it has " + fields.length); } - + return new ComparablePair<A,B>(firstLexicoder.decode(unescape(fields[0])), secondLexicoder.decode(unescape(fields[1]))); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ReverseLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ReverseLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ReverseLexicoder.java index 1ced085..1a6999b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ReverseLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ReverseLexicoder.java @@ -23,14 +23,14 @@ import static org.apache.accumulo.core.client.lexicoder.impl.ByteUtils.unescape; * A lexicoder that flips the sort order from another lexicoder. If this is applied to {@link DateLexicoder}, the most recent date will be sorted first and the * oldest date will be sorted last. If it's applied to {@link LongLexicoder}, the Long.MAX_VALUE will be sorted first and Long.MIN_VALUE will be sorted last, * etc... - * + * * @since 1.6.0 */ public class ReverseLexicoder<T> implements Lexicoder<T> { - + private Lexicoder<T> lexicoder; - + /** * @param lexicoder * The lexicoder who's sort order will be flipped. @@ -38,27 +38,27 @@ public class ReverseLexicoder<T> implements Lexicoder<T> { public ReverseLexicoder(Lexicoder<T> lexicoder) { this.lexicoder = lexicoder; } - + @Override public byte[] encode(T data) { byte[] bytes = escape(lexicoder.encode(data)); byte[] ret = new byte[bytes.length + 1]; - + for (int i = 0; i < bytes.length; i++) ret[i] = (byte) (0xff - (0xff & bytes[i])); - + ret[bytes.length] = (byte) 0xff; - + return ret; } - + @Override public T decode(byte[] data) { byte ret[] = new byte[data.length - 1]; - + for (int i = 0; i < ret.length; i++) ret[i] = (byte) (0xff - (0xff & data[i])); - + return lexicoder.decode(unescape(ret)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/StringLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/StringLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/StringLexicoder.java index 165248b..9d5e07e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/StringLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/StringLexicoder.java @@ -21,20 +21,20 @@ import static java.nio.charset.StandardCharsets.UTF_8; /** * This lexicoder encodes/decodes a given String to/from bytes without further processing. It can be combined with other encoders like the * {@link ReverseLexicoder} to flip the default sort order. - * + * * @since 1.6.0 */ public class StringLexicoder implements Lexicoder<String> { - + @Override public byte[] encode(String data) { return data.getBytes(UTF_8); } - + @Override public String decode(byte[] data) { return new String(data, UTF_8); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/TextLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/TextLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/TextLexicoder.java index eb9d554..08837a5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/TextLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/TextLexicoder.java @@ -22,20 +22,20 @@ import org.apache.hadoop.io.Text; /** * A lexicoder that preserves a Text's native sort order. It can be combined with other encoders like the {@link ReverseLexicoder} to flip the default sort * order. - * + * * @since 1.6.0 */ public class TextLexicoder implements Lexicoder<Text> { - + @Override public byte[] encode(Text data) { return TextUtil.getBytes(data); } - + @Override public Text decode(byte[] data) { return new Text(data); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UIntegerLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UIntegerLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UIntegerLexicoder.java index 81398ae..f8842a9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UIntegerLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UIntegerLexicoder.java @@ -19,57 +19,57 @@ package org.apache.accumulo.core.client.lexicoder; /** * A lexicoder for an unsigned integer. It sorts 0 before -1 and does not preserve the native sort order of a Java integer because Java does not contain an * unsigned integer. If Java had an unsigned integer type, this would correspond to its sort order. - * + * * @since 1.6.0 */ public class UIntegerLexicoder implements Lexicoder<Integer> { - + @Override public byte[] encode(Integer i) { int shift = 56; int index; int prefix = i < 0 ? 0xff : 0x00; - + for (index = 0; index < 4; index++) { if (((i >>> shift) & 0xff) != prefix) break; - + shift -= 8; } - + byte ret[] = new byte[5 - index]; ret[0] = (byte) (4 - index); for (index = 1; index < ret.length; index++) { ret[index] = (byte) (i >>> shift); shift -= 8; } - + if (i < 0) ret[0] = (byte) (8 - ret[0]); - + return ret; - + } - + @Override public Integer decode(byte[] data) { - + if (data[0] < 0 || data[0] > 8) throw new IllegalArgumentException("Unexpected length " + (0xff & data[0])); - + int i = 0; int shift = 0; - + for (int idx = data.length - 1; idx >= 1; idx--) { i += (data[idx] & 0xffl) << shift; shift += 8; } - + // fill in 0xff prefix if (data[0] > 4) i |= -1 << ((8 - data[0]) << 3); - + return i; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ULongLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ULongLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ULongLexicoder.java index 1539fe7..a3dcab5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ULongLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/ULongLexicoder.java @@ -19,56 +19,56 @@ package org.apache.accumulo.core.client.lexicoder; /** * Unsigned long lexicoder. The lexicographic encoding sorts first 0l and -1l last. This encoding does not correspond to the sort of Long because it does not * consider the sign bit. If Java had an unsigned long type, this encoder would correspond to its sort order. - * + * * @since 1.6.0 */ public class ULongLexicoder implements Lexicoder<Long> { - + @Override public byte[] encode(Long l) { int shift = 56; int index; int prefix = l < 0 ? 0xff : 0x00; - + for (index = 0; index < 8; index++) { if (((l >>> shift) & 0xff) != prefix) break; - + shift -= 8; } - + byte ret[] = new byte[9 - index]; ret[0] = (byte) (8 - index); for (index = 1; index < ret.length; index++) { ret[index] = (byte) (l >>> shift); shift -= 8; } - + if (l < 0) ret[0] = (byte) (16 - ret[0]); - + return ret; - + } - + @Override public Long decode(byte[] data) { - + long l = 0; int shift = 0; - + if (data[0] < 0 || data[0] > 16) throw new IllegalArgumentException("Unexpected length " + (0xff & data[0])); - + for (int i = data.length - 1; i >= 1; i--) { l += (data[i] & 0xffl) << shift; shift += 8; } - + // fill in 0xff prefix if (data[0] > 8) l |= -1l << ((16 - data[0]) << 3); - + return l; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UUIDLexicoder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UUIDLexicoder.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UUIDLexicoder.java index 4611b25..43fa1a4 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UUIDLexicoder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/UUIDLexicoder.java @@ -27,11 +27,11 @@ import org.apache.accumulo.core.iterators.ValueFormatException; /** * A lexicoder for a UUID that maintains its lexicographic sorting order. - * + * * @since 1.6.0 */ public class UUIDLexicoder implements Lexicoder<UUID> { - + /** * @see <a href="http://www.ietf.org/rfc/rfc4122.txt"> RFC 4122: A Universally Unique IDentifier (UUID) URN Namespace, "Rules for Lexical Equivalence" in * Section 3.</a> @@ -41,18 +41,18 @@ public class UUIDLexicoder implements Lexicoder<UUID> { try { byte ret[] = new byte[16]; DataOutputStream out = new DataOutputStream(new FixedByteArrayOutputStream(ret)); - + out.writeLong(uuid.getMostSignificantBits() ^ 0x8000000000000000l); out.writeLong(uuid.getLeastSignificantBits() ^ 0x8000000000000000l); - + out.close(); - + return ret; } catch (IOException e) { throw new RuntimeException(e); } } - + @Override public UUID decode(byte[] b) throws ValueFormatException { try { @@ -62,5 +62,5 @@ public class UUIDLexicoder implements Lexicoder<UUID> { throw new RuntimeException(e); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/ByteUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/ByteUtils.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/ByteUtils.java index d418767..4973de8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/ByteUtils.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/ByteUtils.java @@ -19,7 +19,7 @@ package org.apache.accumulo.core.client.lexicoder.impl; import java.util.ArrayList; public class ByteUtils { - + /** * Escapes 0x00 with 0x01 0x01 and 0x01 with 0x01 0x02 */ @@ -30,13 +30,13 @@ public class ByteUtils { escapeCount++; } } - + if (escapeCount == 0) return in; - + byte ret[] = new byte[escapeCount + in.length]; int index = 0; - + for (int i = 0; i < in.length; i++) { switch (in[i]) { case 0x00: @@ -51,7 +51,7 @@ public class ByteUtils { ret[index++] = in[i]; } } - + return ret; } @@ -66,12 +66,12 @@ public class ByteUtils { i++; } } - + if (escapeCount == 0) return in; - + byte ret[] = new byte[in.length - escapeCount]; - + int index = 0; for (int i = 0; i < in.length; i++) { if (in[i] == 0x01) { @@ -80,9 +80,9 @@ public class ByteUtils { } else { ret[index++] = in[i]; } - + } - + return ret; } @@ -91,24 +91,24 @@ public class ByteUtils { */ public static byte[][] split(byte[] data) { ArrayList<Integer> offsets = new ArrayList<Integer>(); - + for (int i = 0; i < data.length; i++) { if (data[i] == 0x00) { offsets.add(i); } } - + offsets.add(data.length); - + byte[][] ret = new byte[offsets.size()][]; - + int index = 0; for (int i = 0; i < offsets.size(); i++) { ret[i] = new byte[offsets.get(i) - index]; System.arraycopy(data, index, ret[i], 0, ret[i].length); index = offsets.get(i) + 1; } - + return ret; } @@ -120,18 +120,18 @@ public class ByteUtils { for (byte[] field : fields) { len += field.length; } - + byte ret[] = new byte[len + fields.length - 1]; int index = 0; - + for (byte[] field : fields) { System.arraycopy(field, 0, ret, index, field.length); index += field.length; if (index < ret.length) ret[index++] = 0x00; } - + return ret; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/FixedByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/FixedByteArrayOutputStream.java b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/FixedByteArrayOutputStream.java index dc10d7b..e3a87da 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/FixedByteArrayOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/client/lexicoder/impl/FixedByteArrayOutputStream.java @@ -23,23 +23,23 @@ import java.io.OutputStream; * Uses a fixed length array and will not grow in size dynamically like the {@link java.io.ByteArrayOutputStream}. */ public class FixedByteArrayOutputStream extends OutputStream { - + private int i; byte out[]; - + public FixedByteArrayOutputStream(byte out[]) { this.out = out; } - + @Override public void write(int b) throws IOException { out[i++] = (byte) b; } - + @Override public void write(byte b[], int off, int len) throws IOException { System.arraycopy(b, off, out, i, len); i += len; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java index cfaaa58..2f2b4b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java @@ -43,7 +43,7 @@ import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to write output in the Accumulo data file format.<br /> * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important requirement of Accumulo data files. - * + * * <p> * The output path to be created must be specified via {@link AccumuloFileOutputFormat#setOutputPath(JobConf, Path)}. This is inherited from * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Other methods from {@link FileOutputFormat} are not supported and may be ignored or cause failures. @@ -51,14 +51,14 @@ import org.apache.log4j.Logger; * supported at this time. */ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { - + private static final Class<?> CLASS = AccumuloFileOutputFormat.class; protected static final Logger log = Logger.getLogger(CLASS); - + /** * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been * stored in the Job's configuration. - * + * * @param job * the Hadoop context for the configured job * @since 1.5.0 @@ -66,10 +66,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { protected static AccumuloConfiguration getAccumuloConfiguration(JobConf job) { return FileOutputConfigurator.getAccumuloConfiguration(CLASS, job); } - + /** * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job. - * + * * @param job * the Hadoop job instance to be configured * @param compressionType @@ -79,14 +79,14 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setCompressionType(JobConf job, String compressionType) { FileOutputConfigurator.setCompressionType(CLASS, job, compressionType); } - + /** * Sets the size for data blocks within each file.<br /> * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group. - * + * * <p> * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance). - * + * * @param job * the Hadoop job instance to be configured * @param dataBlockSize @@ -96,10 +96,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setDataBlockSize(JobConf job, long dataBlockSize) { FileOutputConfigurator.setDataBlockSize(CLASS, job, dataBlockSize); } - + /** * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system. - * + * * @param job * the Hadoop job instance to be configured * @param fileBlockSize @@ -109,11 +109,11 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setFileBlockSize(JobConf job, long fileBlockSize) { FileOutputConfigurator.setFileBlockSize(CLASS, job, fileBlockSize); } - + /** * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow * index hierarchy within the file. This can affect the performance of queries. - * + * * @param job * the Hadoop job instance to be configured * @param indexBlockSize @@ -123,10 +123,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setIndexBlockSize(JobConf job, long indexBlockSize) { FileOutputConfigurator.setIndexBlockSize(CLASS, job, indexBlockSize); } - + /** * Sets the file system replication factor for the resulting file, overriding the file system default. - * + * * @param job * the Hadoop job instance to be configured * @param replication @@ -136,37 +136,37 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setReplication(JobConf job, int replication) { FileOutputConfigurator.setReplication(CLASS, job, replication); } - + @Override public RecordWriter<Key,Value> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file final Configuration conf = job; final AccumuloConfiguration acuConf = getAccumuloConfiguration(job); - + final String extension = acuConf.get(Property.TABLE_FILE_TYPE); final Path file = new Path(getWorkOutputPath(job), getUniqueName(job, "part") + "." + extension); - + final LRUMap validVisibilities = new LRUMap(ConfiguratorBase.getVisibilityCacheSize(conf)); - + return new RecordWriter<Key,Value>() { FileSKVWriter out = null; - + @Override public void close(Reporter reporter) throws IOException { if (out != null) out.close(); } - + @Override public void write(Key key, Value value) throws IOException { - + Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData()); if (wasChecked == null) { byte[] cv = key.getColumnVisibilityData().toArray(); new ColumnVisibility(cv); validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } - + if (out == null) { out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf); out.startDefaultLocalityGroup(); @@ -175,5 +175,5 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { } }; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java index 18e286a..2cdc236 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java @@ -35,17 +35,16 @@ import org.apache.log4j.Level; /** * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides keys and values of type {@link Key} and * {@link Value} to the Map function. - * + * * The user must specify the following via static configurator methods: - * + * * <ul> * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)} - * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR - * {@link AccumuloInputFormat#setMockInstance(JobConf, String)} + * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)} * </ul> - * + * * Other static methods are optional. */ public class AccumuloInputFormat extends InputFormatBase<Key,Value> { @@ -53,7 +52,7 @@ public class AccumuloInputFormat extends InputFormatBase<Key,Value> { @Override public RecordReader<Key,Value> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { log.setLevel(getLogLevel(job)); - + // Override the log level from the configuration as if the RangeInputSplit has one it's the more correct one to use. if (split instanceof org.apache.accumulo.core.client.mapreduce.RangeInputSplit) { org.apache.accumulo.core.client.mapreduce.RangeInputSplit risplit = (org.apache.accumulo.core.client.mapreduce.RangeInputSplit) split; @@ -62,7 +61,7 @@ public class AccumuloInputFormat extends InputFormatBase<Key,Value> { log.setLevel(level); } } - + RecordReaderBase<Key,Value> recordReader = new RecordReaderBase<Key,Value>() { @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java index bbafef5..00a79f2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java @@ -33,9 +33,9 @@ import org.apache.hadoop.mapred.Reporter; /** * This class allows MapReduce jobs to use multiple Accumulo tables as the source of data. This {@link org.apache.hadoop.mapred.InputFormat} provides keys and * values of type {@link Key} and {@link Value} to the Map function. - * + * * The user must specify the following via static configurator methods: - * + * * <ul> * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)} * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} @@ -43,7 +43,7 @@ import org.apache.hadoop.mapred.Reporter; * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)} * <li>{@link AccumuloMultiTableInputFormat#setInputTableConfigs(org.apache.hadoop.mapred.JobConf, java.util.Map)} * </ul> - * + * * Other static methods are optional. */ @@ -51,7 +51,7 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value /** * Sets the {@link InputTableConfig} objects on the given Hadoop configuration - * + * * @param job * the Hadoop job instance to be configured * @param configs http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java index a32a8b8..f877ec6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java @@ -428,7 +428,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { try { addTable(table); } catch (final Exception e) { - log.error("Could not add table '"+table.toString()+"'", e); + log.error("Could not add table '" + table.toString() + "'", e); throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java index 673c5b8..6f257ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java @@ -36,16 +36,16 @@ import org.apache.hadoop.mapred.Reporter; /** * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides row names as {@link Text} as keys, and a * corresponding {@link PeekingIterator} as a value, which in turn makes the {@link Key}/{@link Value} pairs for that row available to the Map function. - * + * * The user must specify the following via static configurator methods: - * + * * <ul> * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} * <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)} * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} OR {@link AccumuloRowInputFormat#setMockInstance(JobConf, String)} * </ul> - * + * * Other static methods are optional. */ public class AccumuloRowInputFormat extends InputFormatBase<Text,PeekingIterator<Entry<Key,Value>>> { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java index 0cee355..5a8f592 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/InputFormatBase.java @@ -53,7 +53,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Sets the name of the input table, over which this job will scan. - * + * * @param job * the Hadoop job instance to be configured * @param tableName @@ -66,7 +66,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Gets the table name from the configuration. - * + * * @param job * the Hadoop context for the configured job * @return the table name @@ -79,7 +79,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Sets the input ranges to scan for this job. If not set, the entire table will be scanned. - * + * * @param job * the Hadoop job instance to be configured * @param ranges @@ -92,7 +92,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Gets the ranges to scan over from a job. - * + * * @param job * the Hadoop context for the configured job * @return the ranges @@ -107,7 +107,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Restricts the columns that will be mapped over for this job. - * + * * @param job * the Hadoop job instance to be configured * @param columnFamilyColumnQualifierPairs @@ -121,7 +121,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Gets the columns to be mapped over from this job. - * + * * @param job * the Hadoop context for the configured job * @return a set of columns @@ -134,7 +134,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Encode an iterator on the input for this job. - * + * * @param job * the Hadoop job instance to be configured * @param cfg @@ -147,7 +147,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Gets a list of the iterator settings (for iterators to apply to a scanner) from this configuration. - * + * * @param job * the Hadoop context for the configured job * @return a list of iterators @@ -161,10 +161,10 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Controls the automatic adjustment of ranges for this job. This feature merges overlapping ranges, then splits them to align with tablet boundaries. * Disabling this feature will cause exactly one Map task to be created for each specified range. The default setting is enabled. * - * + * * <p> * By default, this feature is <b>enabled</b>. - * + * * @param job * the Hadoop job instance to be configured * @param enableFeature @@ -178,7 +178,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Determines whether a configuration has auto-adjust ranges enabled. - * + * * @param job * the Hadoop context for the configured job * @return false if the feature is disabled, true otherwise @@ -191,10 +191,10 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Controls the use of the {@link IsolatedScanner} in this job. - * + * * <p> * By default, this feature is <b>disabled</b>. - * + * * @param job * the Hadoop job instance to be configured * @param enableFeature @@ -207,7 +207,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Determines whether a configuration has isolation enabled. - * + * * @param job * the Hadoop context for the configured job * @return true if the feature is enabled, false otherwise @@ -221,10 +221,10 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Controls the use of the {@link ClientSideIteratorScanner} in this job. Enabling this feature will cause the iterator stack to be constructed within the Map * task, rather than within the Accumulo TServer. To use this feature, all classes needed for those iterators must be available on the classpath for the task. - * + * * <p> * By default, this feature is <b>disabled</b>. - * + * * @param job * the Hadoop job instance to be configured * @param enableFeature @@ -237,7 +237,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Determines whether a configuration uses local iterators. - * + * * @param job * the Hadoop context for the configured job * @return true if the feature is enabled, false otherwise @@ -253,26 +253,26 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { * Enable reading offline tables. By default, this feature is disabled and only online tables are scanned. This will make the map reduce job directly read the * table's files. If the table is not offline, then the job will fail. If the table comes online during the map reduce job, it is likely that the job will * fail. - * + * * <p> * To use this option, the map reduce user will need access to read the Accumulo directory in HDFS. - * + * * <p> * Reading the offline table will create the scan time iterator stack in the map process. So any iterators that are configured for the table will need to be * on the mapper's classpath. - * + * * <p> * One way to use this feature is to clone a table, take the clone offline, and use the clone as the input table for a map reduce job. If you plan to map * reduce over the data many times, it may be better to the compact the table, clone it, take it offline, and use the clone for all map reduce jobs. The * reason to do this is that compaction will reduce each tablet in the table to one file, and it is faster to read from one file. - * + * * <p> * There are two possible advantages to reading a tables file directly out of HDFS. First, you may see better read performance. Second, it will support * speculative execution better. When reading an online table speculative execution can put more load on an already slow tablet server. - * + * * <p> * By default, this feature is <b>disabled</b>. - * + * * @param job * the Hadoop job instance to be configured * @param enableFeature @@ -285,7 +285,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Determines whether a configuration has the offline table scan feature enabled. - * + * * @param job * the Hadoop context for the configured job * @return true if the feature is enabled, false otherwise @@ -298,7 +298,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Initializes an Accumulo {@link org.apache.accumulo.core.client.impl.TabletLocator} based on the configuration. - * + * * @param job * the Hadoop job for the configured job * @return an Accumulo tablet locator @@ -332,7 +332,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Apply the configured iterators to the scanner. - * + * * @param iterators * the iterators to set * @param scanner @@ -346,7 +346,7 @@ public abstract class InputFormatBase<K,V> extends AbstractInputFormat<K,V> { /** * Apply the configured iterators from the configuration to the scanner. - * + * * @param job * the job configuration * @param scanner http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java index 3fd2ab0..29274aa 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/RangeInputSplit.java @@ -33,7 +33,7 @@ public class RangeInputSplit extends org.apache.accumulo.core.client.mapreduce.R public RangeInputSplit(RangeInputSplit split) throws IOException { super(split); } - + protected RangeInputSplit(String table, String tableId, Range range, String[] locations) { super(table, tableId, range, locations); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java index 196fb04..c68dd56 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java @@ -41,7 +41,7 @@ import org.apache.log4j.Logger; /** * This class allows MapReduce jobs to write output in the Accumulo data file format.<br /> * Care should be taken to write only sorted data (sorted by {@link Key}), as this is an important requirement of Accumulo data files. - * + * * <p> * The output path to be created must be specified via {@link AccumuloFileOutputFormat#setOutputPath(Job, Path)}. This is inherited from * {@link FileOutputFormat#setOutputPath(Job, Path)}. Other methods from {@link FileOutputFormat} are not supported and may be ignored or cause failures. Using @@ -49,14 +49,14 @@ import org.apache.log4j.Logger; * supported at this time. */ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { - + private static final Class<?> CLASS = AccumuloFileOutputFormat.class; protected static final Logger log = Logger.getLogger(CLASS); - + /** * This helper method provides an AccumuloConfiguration object constructed from the Accumulo defaults, and overridden with Accumulo properties that have been * stored in the Job's configuration. - * + * * @param context * the Hadoop context for the configured job * @since 1.5.0 @@ -64,10 +64,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { protected static AccumuloConfiguration getAccumuloConfiguration(JobContext context) { return FileOutputConfigurator.getAccumuloConfiguration(CLASS, InputFormatBase.getConfiguration(context)); } - + /** * Sets the compression type to use for data blocks. Specifying a compression may require additional libraries to be available to your Job. - * + * * @param job * the Hadoop job instance to be configured * @param compressionType @@ -77,14 +77,14 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setCompressionType(Job job, String compressionType) { FileOutputConfigurator.setCompressionType(CLASS, job.getConfiguration(), compressionType); } - + /** * Sets the size for data blocks within each file.<br /> * Data blocks are a span of key/value pairs stored in the file that are compressed and indexed as a group. - * + * * <p> * Making this value smaller may increase seek performance, but at the cost of increasing the size of the indexes (which can also affect seek performance). - * + * * @param job * the Hadoop job instance to be configured * @param dataBlockSize @@ -94,10 +94,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setDataBlockSize(Job job, long dataBlockSize) { FileOutputConfigurator.setDataBlockSize(CLASS, job.getConfiguration(), dataBlockSize); } - + /** * Sets the size for file blocks in the file system; file blocks are managed, and replicated, by the underlying file system. - * + * * @param job * the Hadoop job instance to be configured * @param fileBlockSize @@ -107,11 +107,11 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setFileBlockSize(Job job, long fileBlockSize) { FileOutputConfigurator.setFileBlockSize(CLASS, job.getConfiguration(), fileBlockSize); } - + /** * Sets the size for index blocks within each file; smaller blocks means a deeper index hierarchy within the file, while larger blocks mean a more shallow * index hierarchy within the file. This can affect the performance of queries. - * + * * @param job * the Hadoop job instance to be configured * @param indexBlockSize @@ -121,10 +121,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setIndexBlockSize(Job job, long indexBlockSize) { FileOutputConfigurator.setIndexBlockSize(CLASS, job.getConfiguration(), indexBlockSize); } - + /** * Sets the file system replication factor for the resulting file, overriding the file system default. - * + * * @param job * the Hadoop job instance to be configured * @param replication @@ -134,37 +134,37 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { public static void setReplication(Job job, int replication) { FileOutputConfigurator.setReplication(CLASS, job.getConfiguration(), replication); } - + @Override public RecordWriter<Key,Value> getRecordWriter(TaskAttemptContext context) throws IOException { // get the path of the temporary output file final Configuration conf = InputFormatBase.getConfiguration(context); final AccumuloConfiguration acuConf = getAccumuloConfiguration(context); - + final String extension = acuConf.get(Property.TABLE_FILE_TYPE); final Path file = this.getDefaultWorkFile(context, "." + extension); - + final LRUMap validVisibilities = new LRUMap(1000); - + return new RecordWriter<Key,Value>() { FileSKVWriter out = null; - + @Override public void close(TaskAttemptContext context) throws IOException { if (out != null) out.close(); } - + @Override public void write(Key key, Value value) throws IOException { - + Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData()); if (wasChecked == null) { byte[] cv = key.getColumnVisibilityData().toArray(); new ColumnVisibility(cv); validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } - + if (out == null) { out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf); out.startDefaultLocalityGroup(); @@ -173,5 +173,5 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { } }; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java index 21a0280..b98cb77 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java @@ -35,15 +35,15 @@ import org.apache.log4j.Level; /** * This class allows MapReduce jobs to use Accumulo as the source of data. This {@link InputFormat} provides keys and values of type {@link Key} and * {@link Value} to the Map function. - * + * * The user must specify the following via static configurator methods: - * + * * <ul> * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)} * </ul> - * + * * Other static methods are optional. */ public class AccumuloInputFormat extends InputFormatBase<Key,Value> { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java index af1001f..010a94f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloMultiTableInputFormat.java @@ -39,23 +39,23 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * This class allows MapReduce jobs to use multiple Accumulo tables as the source of data. This {@link org.apache.hadoop.mapreduce.InputFormat} provides keys * and values of type {@link Key} and {@link Value} to the Map function. - * + * * The user must specify the following via static configurator methods: - * + * * <ul> * <li>{@link AccumuloMultiTableInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} * <li>{@link AccumuloMultiTableInputFormat#setScanAuthorizations(Job, Authorizations)} * <li>{@link AccumuloMultiTableInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)} * <li>{@link AccumuloMultiTableInputFormat#setInputTableConfigs(Job, Map)} * </ul> - * + * * Other static methods are optional. */ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value> { /** * Sets the {@link InputTableConfig} objects on the given Hadoop configuration - * + * * @param job * the Hadoop job instance to be configured * @param configs @@ -87,11 +87,11 @@ public class AccumuloMultiTableInputFormat extends AbstractInputFormat<Key,Value @Override protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, RangeInputSplit split) { - List<IteratorSetting> iterators = split.getIterators(); + List<IteratorSetting> iterators = split.getIterators(); if (null == iterators) { iterators = getInputTableConfig(context, tableName).getIterators(); } - + for (IteratorSetting setting : iterators) { scanner.addScanIterator(setting); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index e220c00..9a8ab58 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -428,7 +428,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { try { addTable(table); } catch (Exception e) { - log.error("Could not add table '"+table+"'", e); + log.error("Could not add table '" + table + "'", e); throw new IOException(e); }