http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java index 07bf6d3..aeba4e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java @@ -28,15 +28,15 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; public class RelativeKey implements Writable { - + private static final byte BIT = 0x01; - + private Key key; private Key prevKey; - + private byte fieldsSame; private byte fieldsPrefixed; - + // Exact match compression options (first byte) and flag for further private static final byte ROW_SAME = BIT << 0; private static final byte CF_SAME = BIT << 1; @@ -46,47 +46,47 @@ public class RelativeKey implements Writable { private static final byte DELETED = BIT << 5; // private static final byte UNUSED_1_6 = BIT << 6; private static final byte PREFIX_COMPRESSION_ENABLED = (byte) (BIT << 7); - + // Prefix compression (second byte) private static final byte ROW_COMMON_PREFIX = BIT << 0; private static final byte CF_COMMON_PREFIX = BIT << 1; private static final byte CQ_COMMON_PREFIX = BIT << 2; private static final byte CV_COMMON_PREFIX = BIT << 3; private static final byte TS_DIFF = BIT << 4; - + // private static final byte UNUSED_2_5 = BIT << 5; // private static final byte UNUSED_2_6 = BIT << 6; // private static final byte UNUSED_2_7 = (byte) (BIT << 7); - + // Values for prefix compression int rowCommonPrefixLen; int cfCommonPrefixLen; int cqCommonPrefixLen; int cvCommonPrefixLen; long tsDiff; - + /** * This constructor is used when one needs to read from an input stream */ public RelativeKey() { - + } - + /** * This constructor is used when constructing a key for writing to an output stream */ public RelativeKey(Key prevKey, Key key) { - + this.key = key; - + fieldsSame = 0; fieldsPrefixed = 0; - + ByteSequence prevKeyScratch; ByteSequence keyScratch; - + if (prevKey != null) { - + prevKeyScratch = prevKey.getRowData(); keyScratch = key.getRowData(); rowCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); @@ -94,7 +94,7 @@ public class RelativeKey implements Writable { fieldsSame |= ROW_SAME; else if (rowCommonPrefixLen > 1) fieldsPrefixed |= ROW_COMMON_PREFIX; - + prevKeyScratch = prevKey.getColumnFamilyData(); keyScratch = key.getColumnFamilyData(); cfCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); @@ -102,7 +102,7 @@ public class RelativeKey implements Writable { fieldsSame |= CF_SAME; else if (cfCommonPrefixLen > 1) fieldsPrefixed |= CF_COMMON_PREFIX; - + prevKeyScratch = prevKey.getColumnQualifierData(); keyScratch = key.getColumnQualifierData(); cqCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); @@ -110,7 +110,7 @@ public class RelativeKey implements Writable { fieldsSame |= CQ_SAME; else if (cqCommonPrefixLen > 1) fieldsPrefixed |= CQ_COMMON_PREFIX; - + prevKeyScratch = prevKey.getColumnVisibilityData(); keyScratch = key.getColumnVisibilityData(); cvCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch); @@ -118,29 +118,29 @@ public class RelativeKey implements Writable { fieldsSame |= CV_SAME; else if (cvCommonPrefixLen > 1) fieldsPrefixed |= CV_COMMON_PREFIX; - + tsDiff = key.getTimestamp() - prevKey.getTimestamp(); if (tsDiff == 0) fieldsSame |= TS_SAME; else fieldsPrefixed |= TS_DIFF; - + fieldsSame |= fieldsPrefixed == 0 ? 0 : PREFIX_COMPRESSION_ENABLED; } - + // stored deleted information in bit vector instead of its own byte if (key.isDeleted()) fieldsSame |= DELETED; } - + /** - * + * * @return -1 (exact match) or the number of bytes in common */ static int getCommonPrefix(ByteSequence prev, ByteSequence cur) { if (prev == cur) return -1; // infinite... exact match - + int prevLen = prev.length(); int curLen = cur.length(); int maxChecks = Math.min(prevLen, curLen); @@ -157,11 +157,11 @@ public class RelativeKey implements Writable { // and if not, then they have a common prefix over all the checks we've done return prevLen == curLen ? -1 : maxChecks; } - + public void setPrevKey(Key pk) { this.prevKey = pk; } - + @Override public void readFields(DataInput in) throws IOException { fieldsSame = in.readByte(); @@ -170,10 +170,10 @@ public class RelativeKey implements Writable { } else { fieldsPrefixed = 0; } - + byte[] row, cf, cq, cv; long ts; - + if ((fieldsSame & ROW_SAME) == ROW_SAME) { row = prevKey.getRowData().toArray(); } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) { @@ -181,7 +181,7 @@ public class RelativeKey implements Writable { } else { row = read(in); } - + if ((fieldsSame & CF_SAME) == CF_SAME) { cf = prevKey.getColumnFamilyData().toArray(); } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) { @@ -189,7 +189,7 @@ public class RelativeKey implements Writable { } else { cf = read(in); } - + if ((fieldsSame & CQ_SAME) == CQ_SAME) { cq = prevKey.getColumnQualifierData().toArray(); } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) { @@ -197,7 +197,7 @@ public class RelativeKey implements Writable { } else { cq = read(in); } - + if ((fieldsSame & CV_SAME) == CV_SAME) { cv = prevKey.getColumnVisibilityData().toArray(); } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) { @@ -205,7 +205,7 @@ public class RelativeKey implements Writable { } else { cv = read(in); } - + if ((fieldsSame & TS_SAME) == TS_SAME) { ts = prevKey.getTimestamp(); } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) { @@ -213,75 +213,75 @@ public class RelativeKey implements Writable { } else { ts = WritableUtils.readVLong(in); } - + this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) == DELETED, false); this.prevKey = this.key; } - + public static class SkippR { RelativeKey rk; int skipped; Key prevKey; - + SkippR(RelativeKey rk, int skipped, Key prevKey) { this.rk = rk; this.skipped = skipped; this.prevKey = prevKey; } } - + public static SkippR fastSkip(DataInput in, Key seekKey, MutableByteSequence value, Key prevKey, Key currKey) throws IOException { // this method assumes that fast skip is being called on a compressed block where the last key // in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the // compressed block... if it does, there is probably an error in the caller's logic - + // this method mostly avoids object allocation and only does compares when the row changes - + MutableByteSequence row, cf, cq, cv; MutableByteSequence prow, pcf, pcq, pcv; - + ByteSequence stopRow = seekKey.getRowData(); ByteSequence stopCF = seekKey.getColumnFamilyData(); ByteSequence stopCQ = seekKey.getColumnQualifierData(); - + long ts = -1; long pts = -1; boolean pdel = false; - + int rowCmp = -1, cfCmp = -1, cqCmp = -1; - + if (currKey != null) { - + prow = new MutableByteSequence(currKey.getRowData()); pcf = new MutableByteSequence(currKey.getColumnFamilyData()); pcq = new MutableByteSequence(currKey.getColumnQualifierData()); pcv = new MutableByteSequence(currKey.getColumnVisibilityData()); pts = currKey.getTimestamp(); - + row = new MutableByteSequence(currKey.getRowData()); cf = new MutableByteSequence(currKey.getColumnFamilyData()); cq = new MutableByteSequence(currKey.getColumnQualifierData()); cv = new MutableByteSequence(currKey.getColumnVisibilityData()); ts = currKey.getTimestamp(); - + rowCmp = row.compareTo(stopRow); cfCmp = cf.compareTo(stopCF); cqCmp = cq.compareTo(stopCQ); - + if (rowCmp >= 0) { if (rowCmp > 0) { RelativeKey rk = new RelativeKey(); rk.key = rk.prevKey = new Key(currKey); return new SkippR(rk, 0, prevKey); } - + if (cfCmp >= 0) { if (cfCmp > 0) { RelativeKey rk = new RelativeKey(); rk.key = rk.prevKey = new Key(currKey); return new SkippR(rk, 0, prevKey); } - + if (cqCmp >= 0) { RelativeKey rk = new RelativeKey(); rk.key = rk.prevKey = new Key(currKey); @@ -289,126 +289,126 @@ public class RelativeKey implements Writable { } } } - + } else { row = new MutableByteSequence(new byte[64], 0, 0); cf = new MutableByteSequence(new byte[64], 0, 0); cq = new MutableByteSequence(new byte[64], 0, 0); cv = new MutableByteSequence(new byte[64], 0, 0); - + prow = new MutableByteSequence(new byte[64], 0, 0); pcf = new MutableByteSequence(new byte[64], 0, 0); pcq = new MutableByteSequence(new byte[64], 0, 0); pcv = new MutableByteSequence(new byte[64], 0, 0); } - + byte fieldsSame = -1; byte fieldsPrefixed = 0; int count = 0; Key newPrevKey = null; - + while (true) { - + pdel = (fieldsSame & DELETED) == DELETED; - + fieldsSame = in.readByte(); if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) fieldsPrefixed = in.readByte(); else fieldsPrefixed = 0; - + boolean changed = false; - + if ((fieldsSame & ROW_SAME) != ROW_SAME) { - + MutableByteSequence tmp = prow; prow = row; row = tmp; - + if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) readPrefix(in, row, prow); else - read(in, row); - + read(in, row); + // read a new row, so need to compare... rowCmp = row.compareTo(stopRow); changed = true; }// else the row is the same as the last, so no need to compare - + if ((fieldsSame & CF_SAME) != CF_SAME) { - + MutableByteSequence tmp = pcf; pcf = cf; cf = tmp; - + if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) readPrefix(in, cf, pcf); else - read(in, cf); - + read(in, cf); + cfCmp = cf.compareTo(stopCF); changed = true; } - + if ((fieldsSame & CQ_SAME) != CQ_SAME) { - + MutableByteSequence tmp = pcq; pcq = cq; cq = tmp; - + if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) readPrefix(in, cq, pcq); else - read(in, cq); - + read(in, cq); + cqCmp = cq.compareTo(stopCQ); changed = true; } - + if ((fieldsSame & CV_SAME) != CV_SAME) { - + MutableByteSequence tmp = pcv; pcv = cv; cv = tmp; - + if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) readPrefix(in, cv, pcv); else - read(in, cv); + read(in, cv); } - + if ((fieldsSame & TS_SAME) != TS_SAME) { pts = ts; - + if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) ts = WritableUtils.readVLong(in) + pts; else - ts = WritableUtils.readVLong(in); + ts = WritableUtils.readVLong(in); } - + readValue(in, value); - + count++; - + if (changed && rowCmp >= 0) { if (rowCmp > 0) break; - + if (cfCmp >= 0) { if (cfCmp > 0) break; - + if (cqCmp >= 0) break; } } - + } - + if (count > 1) { MutableByteSequence trow, tcf, tcq, tcv; long tts; - + // when the current keys field is same as the last, then // set the prev keys field the same as the current key trow = (fieldsSame & ROW_SAME) == ROW_SAME ? row : prow; @@ -416,7 +416,7 @@ public class RelativeKey implements Writable { tcq = (fieldsSame & CQ_SAME) == CQ_SAME ? cq : pcq; tcv = (fieldsSame & CV_SAME) == CV_SAME ? cv : pcv; tts = (fieldsSame & TS_SAME) == TS_SAME ? ts : pts; - + newPrevKey = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(), tcq.offset(), tcq.length(), tcv.getBackingArray(), tcv.offset(), tcv.length(), tts); newPrevKey.setDeleted(pdel); @@ -428,35 +428,35 @@ public class RelativeKey implements Writable { } else { throw new IllegalStateException(); } - + RelativeKey result = new RelativeKey(); result.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(), cq.length(), cv.getBackingArray(), cv.offset(), cv.length(), ts); result.key.setDeleted((fieldsSame & DELETED) != 0); result.prevKey = result.key; - + return new SkippR(result, count, newPrevKey); } - + private static void read(DataInput in, MutableByteSequence mbseq) throws IOException { int len = WritableUtils.readVInt(in); read(in, mbseq, len); } - + private static void readValue(DataInput in, MutableByteSequence mbseq) throws IOException { int len = in.readInt(); read(in, mbseq, len); } - + private static void read(DataInput in, MutableByteSequence mbseqDestination, int len) throws IOException { if (mbseqDestination.getBackingArray().length < len) { mbseqDestination.setArray(new byte[UnsynchronizedBuffer.nextArraySize(len)], 0, 0); } - + in.readFully(mbseqDestination.getBackingArray(), 0, len); mbseqDestination.setLength(len); } - + private static byte[] readPrefix(DataInput in, ByteSequence prefixSource) throws IOException { int prefixLen = WritableUtils.readVInt(in); int remainingLen = WritableUtils.readVInt(in); @@ -470,8 +470,8 @@ public class RelativeKey implements Writable { // read remaining in.readFully(data, prefixLen, remainingLen); return data; - } - + } + private static void readPrefix(DataInput in, MutableByteSequence dest, ByteSequence prefixSource) throws IOException { int prefixLen = WritableUtils.readVInt(in); int remainingLen = WritableUtils.readVInt(in); @@ -489,38 +489,38 @@ public class RelativeKey implements Writable { in.readFully(dest.getBackingArray(), prefixLen, remainingLen); dest.setLength(len); } - + private static byte[] read(DataInput in) throws IOException { int len = WritableUtils.readVInt(in); byte[] data = new byte[len]; in.readFully(data); return data; } - + public Key getKey() { return key; } - + private static void write(DataOutput out, ByteSequence bs) throws IOException { WritableUtils.writeVInt(out, bs.length()); out.write(bs.getBackingArray(), bs.offset(), bs.length()); } - + private static void writePrefix(DataOutput out, ByteSequence bs, int commonPrefixLength) throws IOException { WritableUtils.writeVInt(out, commonPrefixLength); WritableUtils.writeVInt(out, bs.length() - commonPrefixLength); out.write(bs.getBackingArray(), bs.offset() + commonPrefixLength, bs.length() - commonPrefixLength); } - + @Override public void write(DataOutput out) throws IOException { - + out.writeByte(fieldsSame); - + if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) { out.write(fieldsPrefixed); } - + if ((fieldsSame & ROW_SAME) == ROW_SAME) { // same, write nothing } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) { @@ -530,7 +530,7 @@ public class RelativeKey implements Writable { // write it all write(out, key.getRowData()); } - + if ((fieldsSame & CF_SAME) == CF_SAME) { // same, write nothing } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) { @@ -540,7 +540,7 @@ public class RelativeKey implements Writable { // write it all write(out, key.getColumnFamilyData()); } - + if ((fieldsSame & CQ_SAME) == CQ_SAME) { // same, write nothing } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) { @@ -550,7 +550,7 @@ public class RelativeKey implements Writable { // write it all write(out, key.getColumnQualifierData()); } - + if ((fieldsSame & CV_SAME) == CV_SAME) { // same, write nothing } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) { @@ -560,7 +560,7 @@ public class RelativeKey implements Writable { // write it all write(out, key.getColumnVisibilityData()); } - + if ((fieldsSame & TS_SAME) == TS_SAME) { // same, write nothing } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) { @@ -571,5 +571,5 @@ public class RelativeKey implements Writable { WritableUtils.writeVLong(out, key.getTimestamp()); } } - + }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java index 53e4aaa..b87705c 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java @@ -39,37 +39,36 @@ import com.beust.jcommander.Parameter; /** * Split an RFile into large and small key/value files. - * + * */ public class SplitLarge { - + static class Opts extends Help { - @Parameter(names="-m", description="the maximum size of the key/value pair to shunt to the small file") + @Parameter(names = "-m", description = "the maximum size of the key/value pair to shunt to the small file") long maxSize = 10 * 1024 * 1024; - @Parameter(description="<file.rf> { <file.rf> ... }") + @Parameter(description = "<file.rf> { <file.rf> ... }") List<String> files = new ArrayList<String>(); } - - + public static void main(String[] args) throws Exception { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = FileSystem.get(conf); long maxSize = 10 * 1024 * 1024; Opts opts = new Opts(); opts.parseArgs(SplitLarge.class.getName(), args); - + for (String file : opts.files) { - AccumuloConfiguration aconf = DefaultConfiguration.getDefaultConfiguration(); + AccumuloConfiguration aconf = DefaultConfiguration.getDefaultConfiguration(); Path path = new Path(file); CachableBlockFile.Reader rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf); Reader iter = new RFile.Reader(rdr); - + if (!file.endsWith(".rf")) { throw new IllegalArgumentException("File must end with .rf"); } String smallName = file.substring(0, file.length() - 3) + "_small.rf"; String largeName = file.substring(0, file.length() - 3) + "_large.rf"; - + int blockSize = (int) aconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE); Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", conf, aconf), blockSize); small.startDefaultLocalityGroup(); @@ -93,5 +92,5 @@ public class SplitLarge { small.close(); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index 044989d..ecc0b90 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -109,7 +109,7 @@ public final class BCFile { private interface BlockRegister { /** * Register a block that is fully closed. - * + * * @param raw * The size of block in terms of uncompressed bytes. * @param offsetStart @@ -198,7 +198,7 @@ public final class BCFile { /** * Get the output stream for BlockAppender's consumption. - * + * * @return the output stream suitable for writing block data. */ OutputStream getOutputStream() { @@ -207,7 +207,7 @@ public final class BCFile { /** * Get the current position in file. - * + * * @return The current byte offset in underlying file. */ long getCurrentPos() throws IOException { @@ -256,7 +256,7 @@ public final class BCFile { /** * Access point to stuff data into a block. - * + * */ public class BlockAppender extends DataOutputStream { private final BlockRegister blockRegister; @@ -265,7 +265,7 @@ public final class BCFile { /** * Constructor - * + * * @param register * the block register, which is called when the block is closed. * @param wbs @@ -279,7 +279,7 @@ public final class BCFile { /** * Get the raw size of the block. - * + * * @return the number of uncompressed bytes written through the BlockAppender so far. */ public long getRawSize() throws IOException { @@ -291,7 +291,7 @@ public final class BCFile { /** * Get the compressed size of the block in progress. - * + * * @return the number of compressed bytes written to the underlying FS file. The size may be smaller than actual need to compress the all data written due * to internal buffering inside the compressor. */ @@ -331,14 +331,15 @@ public final class BCFile { /** * Constructor - * + * * @param fout * FS output stream. * @param compressionName * Name of the compression algorithm, which will be used for all data blocks. * @see Compression#getSupportedAlgorithms */ - public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration) throws IOException { + public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration) + throws IOException { if (fout.getPos() != 0) { throw new IOException("Output file not at zero offset."); } @@ -435,7 +436,7 @@ public final class BCFile { /** * Create a Meta Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Regular * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation. - * + * * @param name * The name of the Meta Block. The name must not conflict with existing Meta Blocks. * @param compressionName @@ -452,7 +453,7 @@ public final class BCFile { * Create a Meta Block and obtain an output stream for adding data into the block. The Meta Block will be compressed with the same compression algorithm as * data blocks. There can only be one BlockAppender stream active at any time. Regular Blocks may not be created after the first Meta Blocks. The caller * must call BlockAppender.close() to conclude the block creation. - * + * * @param name * The name of the Meta Block. The name must not conflict with existing Meta Blocks. * @return The BlockAppender stream @@ -466,7 +467,7 @@ public final class BCFile { /** * Create a Data Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Data * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation. - * + * * @return The BlockAppender stream */ public BlockAppender prepareDataBlock() throws IOException { @@ -506,7 +507,7 @@ public final class BCFile { /** * Callback to make sure a data block is added to the internal list when it's being closed. - * + * */ private class DataBlockRegister implements BlockRegister { DataBlockRegister() { @@ -628,7 +629,7 @@ public final class BCFile { /** * Get the output stream for BlockAppender's consumption. - * + * * @return the output stream suitable for writing block data. */ public InputStream getInputStream() { @@ -684,7 +685,7 @@ public final class BCFile { /** * Get the name of the compression algorithm used to compress the block. - * + * * @return name of the compression algorithm. */ public String getCompressionName() { @@ -693,7 +694,7 @@ public final class BCFile { /** * Get the uncompressed size of the block. - * + * * @return uncompressed size of the block. */ public long getRawSize() { @@ -702,7 +703,7 @@ public final class BCFile { /** * Get the compressed size of the block. - * + * * @return compressed size of the block. */ public long getCompressedSize() { @@ -711,7 +712,7 @@ public final class BCFile { /** * Get the starting position of the block in the file. - * + * * @return the starting position of the block in the file. */ public long getStartPos() { @@ -721,7 +722,7 @@ public final class BCFile { /** * Constructor - * + * * @param fin * FS input stream. * @param fileLength @@ -807,7 +808,8 @@ public final class BCFile { } } - public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException { + public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration) + throws IOException { this.in = fin; this.conf = conf; @@ -943,7 +945,7 @@ public final class BCFile { /** * Get the name of the default compression algorithm. - * + * * @return the name of the default compression algorithm. */ public String getDefaultCompressionName() { @@ -952,7 +954,7 @@ public final class BCFile { /** * Get version of BCFile file being read. - * + * * @return version of BCFile file being read. */ public Version getBCFileVersion() { @@ -961,7 +963,7 @@ public final class BCFile { /** * Get version of BCFile API. - * + * * @return version of BCFile API. */ public Version getAPIVersion() { @@ -978,7 +980,7 @@ public final class BCFile { /** * Get the number of data blocks. - * + * * @return the number of data blocks. */ public int getBlockCount() { @@ -987,7 +989,7 @@ public final class BCFile { /** * Stream access to a Meta Block. - * + * * @param name * meta block name * @return BlockReader input stream for reading the meta block. @@ -1006,7 +1008,7 @@ public final class BCFile { /** * Stream access to a Data Block. - * + * * @param blockIndex * 0-based data block index. * @return BlockReader input stream for reading the data block. @@ -1032,7 +1034,7 @@ public final class BCFile { /** * Find the smallest Block index whose starting offset is greater than or equal to the specified offset. - * + * * @param offset * User-specific offset. * @return the index to the data Block if such block exists; or -1 otherwise. http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java index e6c2a66..f93bb84 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -30,46 +30,46 @@ import org.apache.hadoop.fs.FSDataInputStream; * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other. */ class BoundedRangeFileInputStream extends InputStream { - + private FSDataInputStream in; private long pos; private long end; private long mark; private final byte[] oneByte = new byte[1]; - + /** * Constructor - * + * * @param in * The FSDataInputStream we connect to. * @param offset * Beginning offset of the region. * @param length * Length of the region. - * + * * The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream. */ public BoundedRangeFileInputStream(FSDataInputStream in, long offset, long length) { if (offset < 0 || length < 0) { throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length); } - + this.in = in; this.pos = offset; this.end = offset + length; this.mark = -1; } - + @Override public int available() throws IOException { int avail = in.available(); if (pos + avail > end) { avail = (int) (end - pos); } - + return avail; } - + @Override public int read() throws IOException { int ret = read(oneByte); @@ -77,25 +77,25 @@ class BoundedRangeFileInputStream extends InputStream { return oneByte[0] & 0xff; return -1; } - + @Override public int read(byte[] b) throws IOException { return read(b, 0, b.length); } - + @Override public int read(final byte[] b, final int off, int len) throws IOException { if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } - + final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos))); if (n == 0) return -1; Integer ret = 0; final FSDataInputStream inLocal = in; synchronized (inLocal) { - inLocal.seek(pos); + inLocal.seek(pos); try { ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() { @Override @@ -116,7 +116,7 @@ class BoundedRangeFileInputStream extends InputStream { pos += ret; return ret; } - + @Override /* * We may skip beyond the end of the file. @@ -126,24 +126,24 @@ class BoundedRangeFileInputStream extends InputStream { pos += len; return len; } - + @Override public void mark(int readlimit) { mark = pos; } - + @Override public void reset() throws IOException { if (mark < 0) throw new IOException("Resetting to invalid mark"); pos = mark; } - + @Override public boolean markSupported() { return true; } - + @Override public void close() { // Invalidate the state of the stream. http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java index fe45bad..d7651e8 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -29,46 +29,46 @@ class CompareUtils { private CompareUtils() { // nothing } - + /** * A comparator to compare anything that implements {@link RawComparable} using a customized comparator. */ public static final class BytesComparator implements Comparator<RawComparable> { private RawComparator<Object> cmp; - + public BytesComparator(RawComparator<Object> cmp) { this.cmp = cmp; } - + @Override public int compare(RawComparable o1, RawComparable o2) { return compare(o1.buffer(), o1.offset(), o1.size(), o2.buffer(), o2.offset(), o2.size()); } - + public int compare(byte[] a, int off1, int len1, byte[] b, int off2, int len2) { return cmp.compare(a, off1, len1, b, off2, len2); } } - + /** * Interface for all objects that has a single integer magnitude. */ interface Scalar { long magnitude(); } - + static final class ScalarLong implements Scalar { private long magnitude; - + public ScalarLong(long m) { magnitude = m; } - + public long magnitude() { return magnitude; } } - + public static final class ScalarComparator implements Comparator<Scalar>, Serializable { private static final long serialVersionUID = 1L; @@ -82,7 +82,7 @@ class CompareUtils { return 0; } } - + public static final class MemcmpRawComparator implements RawComparator<Object>, Serializable { private static final long serialVersionUID = 1L; @@ -90,7 +90,7 @@ class CompareUtils { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); } - + @Override public int compare(Object o1, Object o2) { throw new RuntimeException("Object comparison not supported"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java index 5288bbb..9defa1c 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -41,24 +41,24 @@ import org.apache.hadoop.util.ReflectionUtils; */ public final class Compression { static final Log LOG = LogFactory.getLog(Compression.class); - + /** * Prevent the instantiation of class. */ private Compression() { // nothing } - + static class FinishOnFlushCompressionStream extends FilterOutputStream { public FinishOnFlushCompressionStream(CompressionOutputStream cout) { super(cout); } - + @Override public void write(byte b[], int off, int len) throws IOException { out.write(b, off, len); } - + @Override public void flush() throws IOException { CompressionOutputStream cout = (CompressionOutputStream) out; @@ -67,7 +67,7 @@ public final class Compression { cout.resetState(); } } - + /** snappy codec **/ public static final String COMPRESSION_SNAPPY = "snappy"; /** compression: gzip */ @@ -76,7 +76,7 @@ public final class Compression { public static final String COMPRESSION_LZO = "lzo"; /** compression: none */ public static final String COMPRESSION_NONE = "none"; - + /** * Compression algorithms. */ @@ -85,7 +85,7 @@ public final class Compression { private transient boolean checked = false; private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec"; private transient CompressionCodec codec = null; - + @Override public synchronized boolean isSupported() { if (!checked) { @@ -101,16 +101,16 @@ public final class Compression { } return codec != null; } - + @Override CompressionCodec getCodec() throws IOException { if (!isSupported()) { throw new IOException("LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } - + return codec; } - + @Override public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { @@ -127,7 +127,7 @@ public final class Compression { BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } - + @Override public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { @@ -145,20 +145,20 @@ public final class Compression { return bos2; } }, - + GZ(COMPRESSION_GZ) { private transient DefaultCodec codec; - + @Override synchronized CompressionCodec getCodec() { if (codec == null) { codec = new DefaultCodec(); codec.setConf(conf); } - + return codec; } - + @Override public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { // Set the internal buffer size to read from down stream. @@ -169,7 +169,7 @@ public final class Compression { BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } - + @Override public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { OutputStream bos1 = null; @@ -183,19 +183,19 @@ public final class Compression { BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } - + @Override public boolean isSupported() { return true; } }, - + NONE(COMPRESSION_NONE) { @Override CompressionCodec getCodec() { return null; } - + @Override public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (downStreamBufferSize > 0) { @@ -203,38 +203,38 @@ public final class Compression { } return downStream; } - + @Override public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (downStreamBufferSize > 0) { return new BufferedOutputStream(downStream, downStreamBufferSize); } - + return downStream; } - + @Override public boolean isSupported() { return true; } }, - + SNAPPY(COMPRESSION_SNAPPY) { // Use base type to avoid compile-time dependencies. private transient CompressionCodec snappyCodec = null; private transient boolean checked = false; private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec"; - + public CompressionCodec getCodec() throws IOException { if (!isSupported()) { throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?"); } return snappyCodec; } - + @Override public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { - + if (!isSupported()) { throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?"); } @@ -249,7 +249,7 @@ public final class Compression { BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } - + @Override public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { @@ -262,7 +262,7 @@ public final class Compression { BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } - + @Override public synchronized boolean isSupported() { if (!checked) { @@ -289,19 +289,19 @@ public final class Compression { private static final int DATA_OBUF_SIZE = 4 * 1024; public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class"; public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class"; - + Algorithm(String name) { this.compressName = name; } - + abstract CompressionCodec getCodec() throws IOException; - + public abstract InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException; - + public abstract OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException; - + public abstract boolean isSupported(); - + public Compressor getCompressor() throws IOException { CompressionCodec codec = getCodec(); if (codec != null) { @@ -323,14 +323,14 @@ public final class Compression { } return null; } - + public void returnCompressor(Compressor compressor) { if (compressor != null) { LOG.debug("Return a compressor: " + compressor.hashCode()); CodecPool.returnCompressor(compressor); } } - + public Decompressor getDecompressor() throws IOException { CompressionCodec codec = getCodec(); if (codec != null) { @@ -350,37 +350,37 @@ public final class Compression { } return decompressor; } - + return null; } - + public void returnDecompressor(Decompressor decompressor) { if (decompressor != null) { LOG.debug("Returned a decompressor: " + decompressor.hashCode()); CodecPool.returnDecompressor(decompressor); } } - + public String getName() { return compressName; } } - + static Algorithm getCompressionAlgorithmByName(String compressName) { Algorithm[] algos = Algorithm.class.getEnumConstants(); - + for (Algorithm a : algos) { if (a.getName().equals(compressName)) { return a; } } - + throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); } - + public static String[] getSupportedAlgorithms() { Algorithm[] algos = Algorithm.class.getEnumConstants(); - + ArrayList<String> ret = new ArrayList<String>(); for (Algorithm a : algos) { if (a.isSupported()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java index 612c738..617da34 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -26,7 +26,7 @@ import java.io.IOException; public class MetaBlockAlreadyExists extends IOException { /** * Constructor - * + * * @param s * message. */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java index 566075e..bf9319d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -26,7 +26,7 @@ import java.io.IOException; public class MetaBlockDoesNotExist extends IOException { /** * Constructor - * + * * @param s * message. */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java index a67a242..9ac1e96 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java @@ -37,9 +37,9 @@ public class PrintInfo { BCFile.Reader bcfr = null; try { bcfr = new BCFile.Reader(fsin, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration); - + Set<Entry<String,MetaIndexEntry>> es = bcfr.metaIndex.index.entrySet(); - + for (Entry<String,MetaIndexEntry> entry : es) { PrintStream out = System.out; out.println("Meta block : " + entry.getKey()); @@ -54,7 +54,7 @@ public class PrintInfo { } } } - + public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); AccumuloConfiguration siteConf = SiteConfiguration.getInstance(DefaultConfiguration.getInstance()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java index 48c0bfe..a624223 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -25,28 +25,28 @@ import org.apache.hadoop.io.RawComparator; /** * Interface for objects that can be compared through {@link RawComparator}. This is useful in places where we need a single object reference to specify a range * of bytes in a byte array, such as {@link Comparable} or {@link Collections#binarySearch(java.util.List, Object, Comparator)} - * + * * The actual comparison among RawComparable's requires an external RawComparator and it is applications' responsibility to ensure two RawComparable are * supposed to be semantically comparable with the same RawComparator. */ public interface RawComparable { /** * Get the underlying byte array. - * + * * @return The underlying byte array. */ byte[] buffer(); - + /** * Get the offset of the first byte in the byte array. - * + * * @return The offset of the first byte in the byte array. */ int offset(); - + /** * Get the size of the byte range in the byte array. - * + * * @return The size of the byte range in the byte array. */ int size(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java index 39a853c..3662208 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -27,20 +27,20 @@ import java.io.OutputStream; class SimpleBufferedOutputStream extends FilterOutputStream { protected byte buf[]; // the borrowed buffer protected int count = 0; // bytes used in buffer. - + // Constructor public SimpleBufferedOutputStream(OutputStream out, byte[] buf) { super(out); this.buf = buf; } - + private void flushBuffer() throws IOException { if (count > 0) { out.write(buf, 0, count); count = 0; } } - + @Override public void write(int b) throws IOException { if (count >= buf.length) { @@ -48,7 +48,7 @@ class SimpleBufferedOutputStream extends FilterOutputStream { } buf[count++] = (byte) b; } - + @Override public void write(byte b[], int off, int len) throws IOException { if (len >= buf.length) { @@ -62,13 +62,13 @@ class SimpleBufferedOutputStream extends FilterOutputStream { System.arraycopy(b, off, buf, count, len); count += len; } - + @Override public synchronized void flush() throws IOException { flushBuffer(); out.flush(); } - + // Get the size of internal buffer being used. public int size() { return count; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java index 9131d30..fb0277a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -29,17 +29,17 @@ import org.apache.hadoop.io.Text; * Supporting Utility classes used by TFile, and shared by users of TFile. */ public final class Utils { - + /** * Prevent the instantiation of Utils. */ private Utils() { // nothing } - + /** * Encoding an integer into a variable-length encoding format. Synonymous to <code>Utils#writeVLong(out, n)</code>. - * + * * @param out * output stream * @param n @@ -49,7 +49,7 @@ public final class Utils { public static void writeVInt(DataOutput out, int n) throws IOException { writeVLong(out, n); } - + /** * Encoding a Long integer into a variable-length encoding format. * <ul> @@ -67,7 +67,7 @@ public final class Utils { * <li>if n in [-2^63, 2^63): encode in nine bytes: byte[0]=-121; byte[1] = (n>>54)&0xff; byte[2] = (n>>48)&0xff; byte[3] = (n>>40)&0xff; * byte[4]=(n>>32)&0xff; byte[5]=(n>>24)&0xff; byte[6]=(n>>16)&0xff; byte[7]=(n>>8)&0xff; byte[8]=n&0xff; * </ul> - * + * * @param out * output stream * @param n @@ -79,7 +79,7 @@ public final class Utils { out.writeByte((int) n); return; } - + long un = (n < 0) ? ~n : n; // how many bytes do we need to represent the number with sign bit? int len = (Long.SIZE - Long.numberOfLeadingZeros(un)) / 8 + 1; @@ -138,14 +138,14 @@ public final class Utils { throw new RuntimeException("Internel error"); } } - + /** * Decoding the variable-length integer. Synonymous to <code>(int)Utils#readVLong(in)</code>. - * + * * @param in * input stream * @return the decoded integer - * + * * @see Utils#readVLong(DataInput) */ public static int readVInt(DataInput in) throws IOException { @@ -155,7 +155,7 @@ public final class Utils { } return (int) ret; } - + /** * Decoding the variable-length integer. Suppose the value of the first byte is FB, and the following bytes are NB[*]. * <ul> @@ -164,18 +164,18 @@ public final class Utils { * <li>if (FB in [-104, -73]), return (FB+88)<<16 + (NB[0]&0xff)<<8 + NB[1]&0xff; * <li>if (FB in [-120, -105]), return (FB+112)<<24 + (NB[0]&0xff)<<16 + (NB[1]&0xff)<<8 + NB[2]&0xff; * <li>if (FB in [-128, -121]), return interpret NB[FB+129] as a signed big-endian integer. - * + * * @param in * input stream * @return the decoded long integer. */ - + public static long readVLong(DataInput in) throws IOException { int firstByte = in.readByte(); if (firstByte >= -32) { return firstByte; } - + switch ((firstByte + 128) / 8) { case 11: case 10: @@ -211,7 +211,7 @@ public final class Utils { throw new RuntimeException("Internal error"); } } - + /** * Write a String as a VInt n, followed by n Bytes as in Text format. */ @@ -226,10 +226,10 @@ public final class Utils { writeVInt(out, -1); } } - + /** * Read a String as a VInt n, followed by n Bytes in Text format. - * + * * @param in * The input stream. * @return The string @@ -242,20 +242,20 @@ public final class Utils { in.readFully(buffer); return Text.decode(buffer); } - + /** * A generic Version class. We suggest applications built on top of TFile use this class to maintain version information in their meta blocks. - * + * * A version number consists of a major version and a minor version. The suggested usage of major and minor version number is to increment major version * number when the new storage format is not backward compatible, and increment the minor version otherwise. */ public static final class Version implements Comparable<Version> { private final short major; private final short minor; - + /** * Construct the Version object by reading from the input stream. - * + * * @param in * input stream */ @@ -263,10 +263,10 @@ public final class Utils { major = in.readShort(); minor = in.readShort(); } - + /** * Constructor. - * + * * @param major * major version. * @param minor @@ -276,10 +276,10 @@ public final class Utils { this.major = major; this.minor = minor; } - + /** * Write the object to a DataOutput. The serialized format of the Version is major version followed by minor version, both as big-endian short integers. - * + * * @param out * The DataOutput object. */ @@ -287,34 +287,34 @@ public final class Utils { out.writeShort(major); out.writeShort(minor); } - + /** * Get the major version. - * + * * @return Major version. */ public int getMajor() { return major; } - + /** * Get the minor version. - * + * * @return The minor version. */ public int getMinor() { return minor; } - + /** * Get the size of the serialized Version object. - * + * * @return serialized size of the version object. */ public static int size() { return (Short.SIZE + Short.SIZE) / Byte.SIZE; } - + /** * Return a string representation of the version. */ @@ -322,10 +322,10 @@ public final class Utils { public String toString() { return new StringBuilder("v").append(major).append(".").append(minor).toString(); } - + /** * Test compatibility. - * + * * @param other * The Version object to test compatibility with. * @return true if both versions have the same major version number; false otherwise. @@ -333,7 +333,7 @@ public final class Utils { public boolean compatibleWith(Version other) { return major == other.major; } - + /** * Compare this version with another version. */ @@ -344,7 +344,7 @@ public final class Utils { } return minor - that.minor; } - + @Override public boolean equals(Object other) { if (this == other) @@ -353,16 +353,16 @@ public final class Utils { return false; return compareTo((Version) other) == 0; } - + @Override public int hashCode() { return (major << 16 + minor); } } - + /** * Lower bound binary search. Find the index to the first element in the list that compares greater than or equal to key. - * + * * @param <T> * Type of the input key. * @param list @@ -376,7 +376,7 @@ public final class Utils { public static <T> int lowerBound(List<? extends T> list, T key, Comparator<? super T> cmp) { int low = 0; int high = list.size(); - + while (low < high) { int mid = (low + high) >>> 1; T midVal = list.get(mid); @@ -388,10 +388,10 @@ public final class Utils { } return low; } - + /** * Upper bound binary search. Find the index to the first element in the list that compares greater than the input key. - * + * * @param <T> * Type of the input key. * @param list @@ -405,7 +405,7 @@ public final class Utils { public static <T> int upperBound(List<? extends T> list, T key, Comparator<? super T> cmp) { int low = 0; int high = list.size(); - + while (low < high) { int mid = (low + high) >>> 1; T midVal = list.get(mid); @@ -417,10 +417,10 @@ public final class Utils { } return low; } - + /** * Lower bound binary search. Find the index to the first element in the list that compares greater than or equal to key. - * + * * @param <T> * Type of the input key. * @param list @@ -432,7 +432,7 @@ public final class Utils { public static <T> int lowerBound(List<? extends Comparable<? super T>> list, T key) { int low = 0; int high = list.size(); - + while (low < high) { int mid = (low + high) >>> 1; Comparable<? super T> midVal = list.get(mid); @@ -444,10 +444,10 @@ public final class Utils { } return low; } - + /** * Upper bound binary search. Find the index to the first element in the list that compares greater than the input key. - * + * * @param <T> * Type of the input key. * @param list @@ -459,7 +459,7 @@ public final class Utils { public static <T> int upperBound(List<? extends Comparable<? super T>> list, T key) { int low = 0; int high = list.size(); - + while (low < high) { int mid = (low + high) >>> 1; Comparable<? super T> midVal = list.get(mid); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java index 9b89b47..fd5525b 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/AggregatingIterator.java @@ -35,59 +35,59 @@ import org.apache.log4j.Logger; /** * This iterator wraps another iterator. It automatically aggregates. - * + * * @deprecated since 1.4, replaced by {@link org.apache.accumulo.core.iterators.Combiner} */ @Deprecated public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber { - + private SortedKeyValueIterator<Key,Value> iterator; private ColumnToClassMapping<Aggregator> aggregators; - + private Key workKey = new Key(); - + private Key aggrKey; private Value aggrValue; // private boolean propogateDeletes; private static final Logger log = Logger.getLogger(AggregatingIterator.class); - + public AggregatingIterator deepCopy(IteratorEnvironment env) { return new AggregatingIterator(this, env); } - + private AggregatingIterator(AggregatingIterator other, IteratorEnvironment env) { iterator = other.iterator.deepCopy(env); aggregators = other.aggregators; } - + public AggregatingIterator() {} - + private void aggregateRowColumn(Aggregator aggr) throws IOException { // this function assumes that first value is not delete - + if (iterator.getTopKey().isDeleted()) return; - + workKey.set(iterator.getTopKey()); - + Key keyToAggregate = workKey; - + aggr.reset(); - + aggr.collect(iterator.getTopValue()); iterator.next(); - + while (iterator.hasTop() && !iterator.getTopKey().isDeleted() && iterator.getTopKey().equals(keyToAggregate, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { aggr.collect(iterator.getTopValue()); iterator.next(); } - + aggrKey = workKey; aggrValue = aggr.aggregate(); - + } - + private void findTop() throws IOException { // check if aggregation is needed if (iterator.hasTop()) { @@ -97,12 +97,12 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O } } } - + public AggregatingIterator(SortedKeyValueIterator<Key,Value> iterator, ColumnToClassMapping<Aggregator> aggregators) throws IOException { this.iterator = iterator; this.aggregators = aggregators; } - + @Override public Key getTopKey() { if (aggrKey != null) { @@ -110,7 +110,7 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O } return iterator.getTopKey(); } - + @Override public Value getTopValue() { if (aggrKey != null) { @@ -118,12 +118,12 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O } return iterator.getTopValue(); } - + @Override public boolean hasTop() { return aggrKey != null || iterator.hasTop(); } - + @Override public void next() throws IOException { if (aggrKey != null) { @@ -132,20 +132,20 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O } else { iterator.next(); } - + findTop(); } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { // do not want to seek to the middle of a value that should be // aggregated... - + Range seekRange = IteratorUtil.maximizeStartKeyTimeStamp(range); - + iterator.seek(seekRange, columnFamilies, inclusive); findTop(); - + if (range.getStartKey() != null) { while (hasTop() && getTopKey().equals(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS) && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) { @@ -154,19 +154,19 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O // log.debug("skipping "+getTopKey()); next(); } - + while (hasTop() && range.beforeStartKey(getTopKey())) { next(); } } - + } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - + this.iterator = source; - + try { String context = null; if (null != env) @@ -183,13 +183,13 @@ public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, O throw new IllegalArgumentException(e); } } - + @Override public IteratorOptions describeOptions() { return new IteratorOptions("agg", "Aggregators apply aggregating functions to values with identical keys", null, Collections.singletonList("<columnName> <aggregatorClass>")); } - + @Override public boolean validateOptions(Map<String,String> options) { for (Entry<String,String> entry : options.entrySet()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilyCounter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilyCounter.java b/core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilyCounter.java index b3607ef..934658e 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilyCounter.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/ColumnFamilyCounter.java @@ -28,65 +28,65 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; public class ColumnFamilyCounter implements SortedKeyValueIterator<Key,Value> { - + private SortedKeyValueIterator<Key,Value> source; private Key key; private Value value; - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { this.source = source; } - + @Override public boolean hasTop() { return key != null; } - + @Override public void next() throws IOException { if (source.hasTop()) { ByteSequence currentRow = source.getTopKey().getRowData(); ByteSequence currentColf = source.getTopKey().getColumnFamilyData(); long ts = source.getTopKey().getTimestamp(); - + source.next(); - + int count = 1; - + while (source.hasTop() && source.getTopKey().getRowData().equals(currentRow) && source.getTopKey().getColumnFamilyData().equals(currentColf)) { count++; source.next(); } - + this.key = new Key(currentRow.toArray(), currentColf.toArray(), new byte[0], new byte[0], ts); this.value = new Value(Integer.toString(count).getBytes(UTF_8)); - + } else { this.key = null; this.value = null; } } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { source.seek(range, columnFamilies, inclusive); next(); } - + @Override public Key getTopKey() { return key; } - + @Override public Value getTopValue() { return value; } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { return null; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java b/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java index 683e3f7..6d7cc7e 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.ScannerBase; @@ -36,18 +34,21 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.conf.ColumnSet; import org.apache.hadoop.io.Text; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + /** * A SortedKeyValueIterator that combines the Values for different versions (timestamp) of a Key within a row into a single Value. Combiner will replace one or - * more versions of a Key and their Values with the most recent Key and a Value which is the result of the reduce method. An {@link Column} - * which only specifies a column family will combine all Keys in that column family individually. Similarly, a {@link Column} which specifies a - * column family and column qualifier will combine all Keys in column family and qualifier individually. Combination is only ever performed on multiple versions - * and not across column qualifiers or column visibilities. - * + * more versions of a Key and their Values with the most recent Key and a Value which is the result of the reduce method. An {@link Column} which only specifies + * a column family will combine all Keys in that column family individually. Similarly, a {@link Column} which specifies a column family and column qualifier + * will combine all Keys in column family and qualifier individually. Combination is only ever performed on multiple versions and not across column qualifiers + * or column visibilities. + * * Implementations must provide a reduce method: {@code public Value reduce(Key key, Iterator<Value> iter)}. - * + * * This reduce method will be passed the most recent Key and an iterator over the Values for all non-deleted versions of that Key. A combiner will not combine * keys that differ by more than the timestamp. - * + * * This class and its implementations do not automatically filter out unwanted columns from those being combined, thus it is generally recommended to use a * {@link Combiner} implementation with the {@link ScannerBase#fetchColumnFamily(Text)} or {@link ScannerBase#fetchColumn(Text, Text)} methods. */ @@ -65,7 +66,7 @@ public abstract class Combiner extends WrappingIterator implements OptionDescrib /** * Constructs an iterator over Values whose Keys are versions of the current topKey of the source SortedKeyValueIterator. - * + * * @param source * The SortedKeyValueIterator<Key,Value> from which to read data. */ @@ -100,7 +101,7 @@ public abstract class Combiner extends WrappingIterator implements OptionDescrib /** * This method is unsupported in this iterator. - * + * * @throws UnsupportedOperationException * when called */ @@ -192,13 +193,13 @@ public abstract class Combiner extends WrappingIterator implements OptionDescrib /** * Reduces a list of Values into a single Value. - * + * * @param key * The most recent version of the Key being reduced. - * + * * @param iter * An iterator over the Values for different versions of the key. - * + * * @return The combined Value. */ public abstract Value reduce(Key key, Iterator<Value> iter); @@ -279,7 +280,7 @@ public abstract class Combiner extends WrappingIterator implements OptionDescrib * A convenience method to set which columns a combiner should be applied to. For each column specified, all versions of a Key which match that @{link * IteratorSetting.Column} will be combined individually in each row. This method is likely to be used in conjunction with * {@link ScannerBase#fetchColumnFamily(Text)} or {@link ScannerBase#fetchColumn(Text,Text)}. - * + * * @param is * iterator settings object to configure * @param columns @@ -301,7 +302,7 @@ public abstract class Combiner extends WrappingIterator implements OptionDescrib /** * A convenience method to set the "all columns" option on a Combiner. This will combine all columns individually within each row. - * + * * @param is * iterator settings object to configure * @param combineAllColumns http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/DebugIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/DebugIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/DebugIterator.java index 83265cb..92f49f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/DebugIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/DebugIterator.java @@ -27,48 +27,48 @@ import org.apache.accumulo.core.data.Value; import org.apache.log4j.Logger; public class DebugIterator extends WrappingIterator implements OptionDescriber { - + private String prefix; - + private static final Logger log = Logger.getLogger(DebugIterator.class); - + public DebugIterator() {} - + public DebugIterator deepCopy(IteratorEnvironment env) { return new DebugIterator(this, env); } - + private DebugIterator(DebugIterator other, IteratorEnvironment env) { setSource(other.getSource().deepCopy(env)); prefix = other.prefix; } - + public DebugIterator(String prefix, SortedKeyValueIterator<Key,Value> source) { this.prefix = prefix; this.setSource(source); } - + @Override public Key getTopKey() { Key wc = super.getTopKey(); log.debug(prefix + " getTopKey() --> " + wc); return wc; } - + @Override public Value getTopValue() { Value w = super.getTopValue(); log.debug(prefix + " getTopValue() --> " + w); return w; } - + @Override public boolean hasTop() { boolean b = super.hasTop(); log.debug(prefix + " hasTop() --> " + b); return b; } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { log.debug(prefix + " seek(" + range + ", " + columnFamilies + ", " + inclusive + ")"); @@ -80,7 +80,7 @@ public class DebugIterator extends WrappingIterator implements OptionDescriber { log.debug(prefix + " next()"); super.next(); } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { log.debug("init(" + source + ", " + options + ", " + env + ")"); @@ -94,7 +94,8 @@ public class DebugIterator extends WrappingIterator implements OptionDescriber { @Override public IteratorOptions describeOptions() { - return new IteratorOptions("debug", DebugIterator.class.getSimpleName() + " prints debug information on each SortedKeyValueIterator method invocation", null, null); + return new IteratorOptions("debug", DebugIterator.class.getSimpleName() + " prints debug information on each SortedKeyValueIterator method invocation", + null, null); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/DevNull.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/DevNull.java b/core/src/main/java/org/apache/accumulo/core/iterators/DevNull.java index e7de5e1..d9a7b8d 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/DevNull.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/DevNull.java @@ -28,54 +28,54 @@ import org.apache.accumulo.core.data.Value; /** * An iterator that is useful testing... for example if you want to test ingest performance w/o writing data to disk, insert this iterator for scan as follows * using the accumulo shell. - * + * * config -t ci -s table.iterator.minc.devnull=21,org.apache.accumulo.core.iterators.DevNull - * + * * Could also make scans never return anything very quickly by adding it to the scan stack - * + * * config -t ci -s table.iterator.scan.devnull=21,org.apache.accumulo.core.iterators.DevNull - * + * * And to make major compactions never write anything - * + * * config -t ci -s table.iterator.majc.devnull=21,org.apache.accumulo.core.iterators.DevNull - * + * */ public class DevNull implements SortedKeyValueIterator<Key,Value> { - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); } - + @Override public Key getTopKey() { return null; } - + @Override public Value getTopValue() { return null; } - + @Override public boolean hasTop() { return false; } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - + } - + @Override public void next() throws IOException { throw new UnsupportedOperationException(); } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - + } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/FamilyIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/FamilyIntersectingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/FamilyIntersectingIterator.java index e0623e2..04102b8 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/FamilyIntersectingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/FamilyIntersectingIterator.java @@ -20,10 +20,10 @@ import org.apache.accumulo.core.iterators.user.IndexedDocIterator; /** * This class remains here for backwards compatibility. - * + * * @deprecated since 1.4, replaced by {@link org.apache.accumulo.core.iterators.user.IndexedDocIterator} */ @Deprecated public class FamilyIntersectingIterator extends IndexedDocIterator { - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java b/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java index 25653eb..8b135c7 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java @@ -29,9 +29,9 @@ import org.apache.accumulo.core.data.Value; /** * A SortedKeyValueIterator that filters entries from its source iterator. - * + * * Subclasses must implement an accept method: public boolean accept(Key k, Value v); - * + * * Key/Value pairs for which the accept method returns true are said to match the filter. By default, this class iterates over entries that match its filter. * This iterator takes an optional "negate" boolean parameter that defaults to false. If negate is set to true, this class instead omits entries that match its * filter, thus iterating over entries that do not match its filter. @@ -49,22 +49,22 @@ public abstract class Filter extends WrappingIterator implements OptionDescriber newInstance.negate = negate; return newInstance; } - + protected static final String NEGATE = "negate"; boolean negate = false; - + @Override public void next() throws IOException { super.next(); findTop(); } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { super.seek(range, columnFamilies, inclusive); findTop(); } - + /** * Iterates over the source until an acceptable key/value pair is found. */ @@ -77,12 +77,12 @@ public abstract class Filter extends WrappingIterator implements OptionDescriber } } } - + /** * @return <tt>true</tt> if the key/value pair is accepted by the filter. */ public abstract boolean accept(Key k, Value v); - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); @@ -91,13 +91,13 @@ public abstract class Filter extends WrappingIterator implements OptionDescriber negate = Boolean.parseBoolean(options.get(NEGATE)); } } - + @Override public IteratorOptions describeOptions() { return new IteratorOptions("filter", "Filter accepts or rejects each Key/Value pair", Collections.singletonMap("negate", "default false keeps k/v that pass accept method, true rejects k/v that pass accept method"), null); } - + @Override public boolean validateOptions(Map<String,String> options) { if (options.get(NEGATE) != null) { @@ -109,10 +109,10 @@ public abstract class Filter extends WrappingIterator implements OptionDescriber } return true; } - + /** * A convenience method for setting the negation option on a filter. - * + * * @param is * IteratorSetting object to configure. * @param negate