http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java index 69902a4..5c35f65 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java +++ b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java @@ -22,9 +22,9 @@ import org.apache.accumulo.core.data.Range; public interface KeyFunctor { /** * Implementations should return null if a range can not be converted to a bloom key. - * + * */ org.apache.hadoop.util.bloom.Key transform(Range range); - + org.apache.hadoop.util.bloom.Key transform(Key key); }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java index 20eb26c..b46f593 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java +++ b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java @@ -22,18 +22,18 @@ import org.apache.accumulo.core.data.Range; import org.apache.hadoop.util.bloom.Key; public class RowFunctor implements KeyFunctor { - + @Override public Key transform(org.apache.accumulo.core.data.Key acuKey) { byte keyData[]; - + ByteSequence row = acuKey.getRowData(); keyData = new byte[row.length()]; System.arraycopy(row.getBackingArray(), 0, keyData, 0, row.length()); - + return new Key(keyData, 1.0); } - + @Override public Key transform(Range range) { if (isRangeInBloomFilter(range, PartialKey.ROW)) { @@ -41,16 +41,16 @@ public class RowFunctor implements KeyFunctor { } return null; } - + static boolean isRangeInBloomFilter(Range range, PartialKey keyDepth) { - + if (range.getStartKey() == null || range.getEndKey() == null) { return false; } - + if (range.getStartKey().equals(range.getEndKey(), keyDepth)) return true; - + // include everything but the deleted flag in the comparison... return range.getStartKey().followingKey(keyDepth).equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME) && !range.isEndKeyInclusive(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java index 267f805..fb2762f 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java @@ -43,66 +43,66 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; public class MapFileOperations extends FileOperations { - + public static class RangeIterator implements FileSKVIterator { - + SortedKeyValueIterator<Key,Value> reader; private Range range; private boolean hasTop; - + public RangeIterator(SortedKeyValueIterator<Key,Value> reader) { this.reader = reader; } - + @Override public void close() throws IOException { ((FileSKVIterator) reader).close(); } - + @Override public Key getFirstKey() throws IOException { return ((FileSKVIterator) reader).getFirstKey(); } - + @Override public Key getLastKey() throws IOException { return ((FileSKVIterator) reader).getLastKey(); } - + @Override public DataInputStream getMetaStore(String name) throws IOException { return ((FileSKVIterator) reader).getMetaStore(name); } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { return new RangeIterator(reader.deepCopy(env)); } - + @Override public Key getTopKey() { if (!hasTop) throw new IllegalStateException(); return reader.getTopKey(); } - + @Override public Value getTopValue() { if (!hasTop) throw new IllegalStateException(); return reader.getTopValue(); } - + @Override public boolean hasTop() { return hasTop; } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void next() throws IOException { if (!hasTop) @@ -110,87 +110,87 @@ public class MapFileOperations extends FileOperations { reader.next(); hasTop = reader.hasTop() && !range.afterEndKey(reader.getTopKey()); } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { reader.seek(range, columnFamilies, inclusive); this.range = range; - + hasTop = reader.hasTop() && !range.afterEndKey(reader.getTopKey()); - + while (hasTop() && range.beforeStartKey(getTopKey())) { next(); } } - + @Override public void closeDeepCopies() throws IOException { ((FileSKVIterator) reader).closeDeepCopies(); } - + @Override public void setInterruptFlag(AtomicBoolean flag) { ((FileSKVIterator) reader).setInterruptFlag(flag); } } - + @Override public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf)); - + if (seekToBeginning) iter.seek(new Range(new Key(), null), new ArrayList<ByteSequence>(), false); - + return iter; } - + @Override public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - + throw new UnsupportedOperationException(); } - + @Override public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return new SequenceFileIterator(MapFileUtil.openIndex(conf, fs, new Path(file)), false); } - + @Override public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return fs.getFileStatus(new Path(file + "/" + MapFile.DATA_FILE_NAME)).getLen(); } - + @Override public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException { MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf); - + FileSKVIterator iter = new RangeIterator(mfIter); - + iter.seek(range, columnFamilies, inclusive); - + return iter; } - + @Override public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException { - + return openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf); } - + @Override public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException { - + return openReader(file, seekToBeginning, fs, conf, acuconf); } - + @Override public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache) throws IOException { - + return openIndex(file, fs, conf, acuconf); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java index 41b00d9..1373eac 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java @@ -36,7 +36,7 @@ public class MapFileUtil { throw e; } } - + @SuppressWarnings("deprecation") public static SequenceFile.Reader openIndex(Configuration conf, FileSystem fs, Path mapFile) throws IOException { Path indexPath = new Path(mapFile, MapFile.INDEX_FILE_NAME); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java index 2156a67..1ed9aca 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java @@ -27,40 +27,40 @@ import org.apache.accumulo.core.file.blockfile.ABlockReader; import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry; /** - * + * */ public class BlockIndex { - + public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException { - + BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class); - + int accessCount = blockIndex.accessCount.incrementAndGet(); - + // 1 is a power of two, but do not care about it if (accessCount >= 2 && isPowerOfTwo(accessCount)) { blockIndex.buildIndex(accessCount, cacheBlock, indexEntry); } - + if (blockIndex.blockIndex != null) return blockIndex; return null; } - + private static boolean isPowerOfTwo(int x) { return ((x > 0) && (x & (x - 1)) == 0); } - + private AtomicInteger accessCount = new AtomicInteger(0); private volatile BlockIndexEntry[] blockIndex = null; public static class BlockIndexEntry implements Comparable<BlockIndexEntry> { - + private Key prevKey; private int entriesLeft; private int pos; - + public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) { this.pos = pos; this.entriesLeft = entriesLeft; @@ -70,7 +70,7 @@ public class BlockIndex { public BlockIndexEntry(Key key) { this.prevKey = key; } - + public int getEntriesLeft() { return entriesLeft; } @@ -79,39 +79,39 @@ public class BlockIndex { public int compareTo(BlockIndexEntry o) { return prevKey.compareTo(o.prevKey); } - + @Override public boolean equals(Object o) { if (o instanceof BlockIndexEntry) return compareTo((BlockIndexEntry) o) == 0; return false; } - + @Override public String toString() { return prevKey + " " + entriesLeft + " " + pos; } - + public Key getPrevKey() { return prevKey; } - + @Override public int hashCode() { assert false : "hashCode not designed"; return 42; // any arbitrary constant will do } } - + public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) { // get a local ref to the index, another thread could change it BlockIndexEntry[] blockIndex = this.blockIndex; - + int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey)); int index; - + if (pos < 0) { if (pos == -1) return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning @@ -127,7 +127,7 @@ public class BlockIndex { break; } } - + // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index while (index - 1 > 0) { if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey())) @@ -136,7 +136,7 @@ public class BlockIndex { break; } - + if (index == 0 && blockIndex[index].getPrevKey().equals(startKey)) return null; @@ -144,24 +144,24 @@ public class BlockIndex { cacheBlock.seek(bie.pos); return bie; } - + private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException { cacheBlock.seek(0); - + RelativeKey rk = new RelativeKey(); Value val = new Value(); - + int interval = indexEntry.getNumEntries() / indexEntries; - + if (interval <= 32) return; - + // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1) return; int count = 0; - + ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1); while (count < (indexEntry.getNumEntries() - interval + 1)) { @@ -174,7 +174,7 @@ public class BlockIndex { if (count > 0 && count % interval == 0) { index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey)); } - + count++; } @@ -182,7 +182,7 @@ public class BlockIndex { cacheBlock.seek(0); } - + BlockIndexEntry[] getIndexEntries() { return blockIndex; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java index 9456331..cd6bff8 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java @@ -61,7 +61,9 @@ public class CreateEmpty { static class Opts extends Help { @Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.", validateWith = IsSupportedCompressionAlgorithm.class) String codec = Compression.COMPRESSION_NONE; - @Parameter(description = " <path> { <path> ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.", required = true, validateWith = NamedLikeRFile.class) + @Parameter( + description = " <path> { <path> ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.", + required = true, validateWith = NamedLikeRFile.class) List<String> files = new ArrayList<String>(); } @@ -74,7 +76,8 @@ public class CreateEmpty { for (String arg : opts.files) { Path path = new Path(arg); log.info("Writing to file '" + path + "'"); - FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec); + FileSKVWriter writer = (new RFileOperations()) + .openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec); writer.close(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java index f9c8686..b1dab36 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java @@ -30,10 +30,10 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; class IndexIterator implements SortedKeyValueIterator<Key,Value> { - + private Key key; private Iterator<IndexEntry> indexIter; - + IndexIterator(Iterator<IndexEntry> indexIter) { this.indexIter = indexIter; if (indexIter.hasNext()) @@ -41,32 +41,32 @@ class IndexIterator implements SortedKeyValueIterator<Key,Value> { else key = null; } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); } - + @Override public Key getTopKey() { return key; } - + @Override public Value getTopValue() { throw new UnsupportedOperationException(); } - + @Override public boolean hasTop() { return key != null; } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void next() throws IOException { if (indexIter.hasNext()) @@ -74,10 +74,10 @@ class IndexIterator implements SortedKeyValueIterator<Key,Value> { else key = null; } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { throw new UnsupportedOperationException(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java index 5dade97..f220a58 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java @@ -35,62 +35,62 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.HeapIterator; class MultiIndexIterator extends HeapIterator implements FileSKVIterator { - + private RFile.Reader source; - + MultiIndexIterator(RFile.Reader source, List<Iterator<IndexEntry>> indexes) { super(indexes.size()); - + this.source = source; - + for (Iterator<IndexEntry> index : indexes) { addSource(new IndexIterator(index)); } } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void close() throws IOException { source.close(); } - + @Override public void closeDeepCopies() throws IOException { throw new UnsupportedOperationException(); } - + @Override public Key getFirstKey() throws IOException { throw new UnsupportedOperationException(); } - + @Override public Key getLastKey() throws IOException { throw new UnsupportedOperationException(); } - + @Override public DataInputStream getMetaStore(String name) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void setInterruptFlag(AtomicBoolean flag) { throw new UnsupportedOperationException(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java index 632968e..2109478 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java @@ -41,7 +41,7 @@ import org.apache.accumulo.core.file.rfile.bcfile.Utils; import org.apache.hadoop.io.WritableComparable; public class MultiLevelIndex { - + public static class IndexEntry implements WritableComparable<IndexEntry> { private Key key; private int entries; @@ -49,7 +49,7 @@ public class MultiLevelIndex { private long compressedSize; private long rawSize; private boolean newFormat; - + IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) { this.key = k; this.entries = e; @@ -58,11 +58,11 @@ public class MultiLevelIndex { this.rawSize = rawSize; newFormat = true; } - + public IndexEntry(boolean newFormat) { this.newFormat = newFormat; } - + @Override public void readFields(DataInput in) throws IOException { key = new Key(); @@ -78,7 +78,7 @@ public class MultiLevelIndex { rawSize = -1; } } - + @Override public void write(DataOutput out) throws IOException { key.write(out); @@ -89,59 +89,59 @@ public class MultiLevelIndex { Utils.writeVLong(out, rawSize); } } - + public Key getKey() { return key; } - + public int getNumEntries() { return entries; } - + public long getOffset() { return offset; } - + public long getCompressedSize() { return compressedSize; } - + public long getRawSize() { return rawSize; } - + @Override public int compareTo(IndexEntry o) { return key.compareTo(o.key); } - + @Override public boolean equals(Object o) { if (o instanceof IndexEntry) - return compareTo((IndexEntry)o) == 0; + return compareTo((IndexEntry) o) == 0; return false; } - + @Override public int hashCode() { assert false : "hashCode not designed"; return 42; // any arbitrary constant will do } } - + // a list that deserializes index entries on demand private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess { - + private int[] offsets; private byte[] data; private boolean newFormat; - + SerializedIndex(int[] offsets, byte[] data, boolean newFormat) { this.offsets = offsets; this.data = data; this.newFormat = newFormat; } - + @Override public IndexEntry get(int index) { int len; @@ -149,41 +149,41 @@ public class MultiLevelIndex { len = data.length - offsets[index]; else len = offsets[index + 1] - offsets[index]; - + ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len); DataInputStream dis = new DataInputStream(bais); - + IndexEntry ie = new IndexEntry(newFormat); try { ie.readFields(dis); } catch (IOException e) { throw new RuntimeException(e); } - + return ie; } - + @Override public int size() { return offsets.length; } - + public long sizeInBytes() { return data.length + 4 * offsets.length; } - + } - + private static class KeyIndex extends AbstractList<Key> implements List<Key>, RandomAccess { - + private int[] offsets; private byte[] data; - + KeyIndex(int[] offsets, byte[] data) { this.offsets = offsets; this.data = data; } - + @Override public Key get(int index) { int len; @@ -191,122 +191,122 @@ public class MultiLevelIndex { len = data.length - offsets[index]; else len = offsets[index + 1] - offsets[index]; - + ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len); DataInputStream dis = new DataInputStream(bais); - + Key key = new Key(); try { key.readFields(dis); } catch (IOException e) { throw new RuntimeException(e); } - + return key; } - + @Override public int size() { return offsets.length; } } - + static class IndexBlock { - + private ByteArrayOutputStream indexBytes; private DataOutputStream indexOut; - + private ArrayList<Integer> offsets; private int level; private int offset; - + SerializedIndex index; KeyIndex keyIndex; private boolean hasNext; - + public IndexBlock(int level, int totalAdded) { // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")"); - + this.level = level; this.offset = totalAdded; - + indexBytes = new ByteArrayOutputStream(); indexOut = new DataOutputStream(indexBytes); offsets = new ArrayList<Integer>(); } - + public IndexBlock() {} - + public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException { offsets.add(indexOut.size()); new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut); } - + int getSize() { return indexOut.size() + 4 * offsets.size(); } - + public void write(DataOutput out) throws IOException { out.writeInt(level); out.writeInt(offset); out.writeBoolean(hasNext); - + out.writeInt(offsets.size()); for (Integer offset : offsets) { out.writeInt(offset); } - + indexOut.close(); byte[] indexData = indexBytes.toByteArray(); - + out.writeInt(indexData.length); out.write(indexData); } - + public void readFields(DataInput in, int version) throws IOException { - + if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) { level = in.readInt(); offset = in.readInt(); hasNext = in.readBoolean(); - + int numOffsets = in.readInt(); int[] offsets = new int[numOffsets]; - + for (int i = 0; i < numOffsets; i++) offsets[i] = in.readInt(); - + int indexSize = in.readInt(); byte[] serializedIndex = new byte[indexSize]; in.readFully(serializedIndex); - + index = new SerializedIndex(offsets, serializedIndex, true); keyIndex = new KeyIndex(offsets, serializedIndex); } else if (version == RFile.RINDEX_VER_3) { level = 0; offset = 0; hasNext = false; - + int size = in.readInt(); - + ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); ArrayList<Integer> oal = new ArrayList<Integer>(); - + for (int i = 0; i < size; i++) { IndexEntry ie = new IndexEntry(false); oal.add(dos.size()); ie.readFields(in); ie.write(dos); } - + dos.close(); - + int[] oia = new int[oal.size()]; for (int i = 0; i < oal.size(); i++) { oia[i] = oal.get(i); } - + byte[] serializedIndex = baos.toByteArray(); index = new SerializedIndex(oia, serializedIndex, false); keyIndex = new KeyIndex(oia, serializedIndex); @@ -314,100 +314,100 @@ public class MultiLevelIndex { level = 0; offset = 0; hasNext = false; - + int numIndexEntries = in.readInt(); int offsets[] = new int[numIndexEntries]; for (int i = 0; i < numIndexEntries; i++) { offsets[i] = in.readInt(); } - + int size = in.readInt(); byte[] indexData = new byte[size]; in.readFully(indexData); - + index = new SerializedIndex(offsets, indexData, false); keyIndex = new KeyIndex(offsets, indexData); } else { throw new RuntimeException("Unexpected version " + version); } - + } - + List<IndexEntry> getIndex() { return index; } - + public List<Key> getKeyIndex() { return keyIndex; } - + int getLevel() { return level; } - + int getOffset() { return offset; } - + boolean hasNext() { return hasNext; } - + void setHasNext(boolean b) { this.hasNext = b; } - + } - + /** * this class buffers writes to the index so that chunks of index blocks are contiguous in the file instead of having index blocks sprinkled throughout the * file making scans of the entire index slow. */ public static class BufferedWriter { - + private Writer writer; private DataOutputStream buffer; private int buffered; private ByteArrayOutputStream baos; - + public BufferedWriter(Writer writer) { this.writer = writer; baos = new ByteArrayOutputStream(1 << 20); buffer = new DataOutputStream(baos); buffered = 0; } - + private void flush() throws IOException { buffer.close(); - + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); - + IndexEntry ie = new IndexEntry(true); for (int i = 0; i < buffered; i++) { ie.readFields(dis); writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize()); } - + buffered = 0; baos = new ByteArrayOutputStream(1 << 20); buffer = new DataOutputStream(baos); - + } - + public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException { if (buffer.size() > (10 * 1 << 20)) { flush(); } - + new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer); buffered++; } - + public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException { flush(); writer.addLast(key, data, offset, compressedSize, rawSize); } - + public void close(DataOutput out) throws IOException { writer.close(out); } @@ -415,74 +415,74 @@ public class MultiLevelIndex { public static class Writer { private int threshold; - + private ArrayList<IndexBlock> levels; - + private int totalAdded; - + private boolean addedLast = false; - + private BlockFileWriter blockFileWriter; - + Writer(BlockFileWriter blockFileWriter, int maxBlockSize) { this.blockFileWriter = blockFileWriter; this.threshold = maxBlockSize; levels = new ArrayList<IndexBlock>(); } - + private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException { if (level == levels.size()) { levels.add(new IndexBlock(level, 0)); } - + IndexBlock iblock = levels.get(level); - + iblock.add(key, data, offset, compressedSize, rawSize); } - + private void flush(int level, Key lastKey, boolean last) throws IOException { - + if (last && level == levels.size() - 1) return; - + IndexBlock iblock = levels.get(level); if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) { ABlockWriter out = blockFileWriter.prepareDataBlock(); iblock.setHasNext(!last); iblock.write(out); out.close(); - + add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize()); flush(level + 1, lastKey, last); - + if (last) levels.set(level, null); else levels.set(level, new IndexBlock(level, totalAdded)); } } - + public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException { totalAdded++; add(0, key, data, offset, compressedSize, rawSize); flush(0, key, false); } - + public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException { if (addedLast) throw new IllegalStateException("already added last"); - + totalAdded++; add(0, key, data, offset, compressedSize, rawSize); flush(0, key, true); addedLast = true; - + } - + public void close(DataOutput out) throws IOException { if (totalAdded > 0 && !addedLast) throw new IllegalStateException("did not call addLast"); - + out.writeInt(totalAdded); // save root node if (levels.size() > 0) { @@ -490,32 +490,32 @@ public class MultiLevelIndex { } else { new IndexBlock(0, 0).write(out); } - + } } - + public static class Reader { private IndexBlock rootBlock; private BlockFileReader blockStore; private int version; private int size; - + public class Node { - + private Node parent; private IndexBlock indexBlock; private int currentPos; - + Node(Node parent, IndexBlock iBlock) { this.parent = parent; this.indexBlock = iBlock; } - + Node(IndexBlock rootInfo) { this.parent = null; this.indexBlock = rootInfo; } - + private Node lookup(Key key) throws IOException { int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() { @Override @@ -523,86 +523,86 @@ public class MultiLevelIndex { return o1.compareTo(o2); } }); - + if (pos < 0) pos = (pos * -1) - 1; - + if (pos == indexBlock.getIndex().size()) { if (parent != null) throw new IllegalStateException(); this.currentPos = pos; return this; } - + this.currentPos = pos; - + if (indexBlock.getLevel() == 0) { return this; } - + IndexEntry ie = indexBlock.getIndex().get(pos); Node child = new Node(this, getIndexBlock(ie)); return child.lookup(key); } - + private Node getLast() throws IOException { currentPos = indexBlock.getIndex().size() - 1; if (indexBlock.getLevel() == 0) return this; - + IndexEntry ie = indexBlock.getIndex().get(currentPos); Node child = new Node(this, getIndexBlock(ie)); return child.getLast(); } - + private Node getFirst() throws IOException { currentPos = 0; if (indexBlock.getLevel() == 0) return this; - + IndexEntry ie = indexBlock.getIndex().get(currentPos); Node child = new Node(this, getIndexBlock(ie)); return child.getFirst(); } - + private Node getPrevious() throws IOException { if (currentPos == 0) return parent.getPrevious(); - + currentPos--; - + IndexEntry ie = indexBlock.getIndex().get(currentPos); Node child = new Node(this, getIndexBlock(ie)); return child.getLast(); - + } - + private Node getNext() throws IOException { if (currentPos == indexBlock.getIndex().size() - 1) return parent.getNext(); - + currentPos++; - + IndexEntry ie = indexBlock.getIndex().get(currentPos); Node child = new Node(this, getIndexBlock(ie)); return child.getFirst(); - + } - + Node getNextNode() throws IOException { return parent.getNext(); } - + Node getPreviousNode() throws IOException { return parent.getPrevious(); } } - + static public class IndexIterator implements ListIterator<IndexEntry> { - + private Node node; private ListIterator<IndexEntry> liter; - + private Node getPrevNode() { try { return node.getPreviousNode(); @@ -610,7 +610,7 @@ public class MultiLevelIndex { throw new RuntimeException(e); } } - + private Node getNextNode() { try { return node.getNextNode(); @@ -618,155 +618,155 @@ public class MultiLevelIndex { throw new RuntimeException(e); } } - + public IndexIterator() { node = null; } - + public IndexIterator(Node node) { this.node = node; liter = node.indexBlock.getIndex().listIterator(node.currentPos); } - + @Override public boolean hasNext() { if (node == null) return false; - + if (!liter.hasNext()) { return node.indexBlock.hasNext(); } else { return true; } - + } - + public IndexEntry peekPrevious() { IndexEntry ret = previous(); next(); return ret; } - + public IndexEntry peek() { IndexEntry ret = next(); previous(); return ret; } - + @Override public IndexEntry next() { if (!liter.hasNext()) { node = getNextNode(); liter = node.indexBlock.getIndex().listIterator(); } - + return liter.next(); } - + @Override public boolean hasPrevious() { if (node == null) return false; - + if (!liter.hasPrevious()) { return node.indexBlock.getOffset() > 0; } else { return true; } } - + @Override public IndexEntry previous() { if (!liter.hasPrevious()) { node = getPrevNode(); liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size()); } - + return liter.previous(); } - + @Override public int nextIndex() { return node.indexBlock.getOffset() + liter.nextIndex(); } - + @Override public int previousIndex() { return node.indexBlock.getOffset() + liter.previousIndex(); } - + @Override public void remove() { throw new UnsupportedOperationException(); } - + @Override public void set(IndexEntry e) { throw new UnsupportedOperationException(); - + } - + @Override public void add(IndexEntry e) { throw new UnsupportedOperationException(); } - + } - + public Reader(BlockFileReader blockStore, int version) { this.version = version; this.blockStore = blockStore; } - + private IndexBlock getIndexBlock(IndexEntry ie) throws IOException { IndexBlock iblock = new IndexBlock(); ABlockReader in = blockStore.getMetaBlock(ie.getOffset(), ie.getCompressedSize(), ie.getRawSize()); iblock.readFields(in, version); in.close(); - + return iblock; } - + public IndexIterator lookup(Key key) throws IOException { Node node = new Node(rootBlock); return new IndexIterator(node.lookup(key)); } - + public void readFields(DataInput in) throws IOException { - + size = 0; - + if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) { size = in.readInt(); } - + rootBlock = new IndexBlock(); rootBlock.readFields(in, version); - + if (version == RFile.RINDEX_VER_3 || version == RFile.RINDEX_VER_4) { size = rootBlock.getIndex().size(); } } - + public int size() { return size; } - + private void getIndexInfo(IndexBlock ib, Map<Integer,Long> sizesByLevel, Map<Integer,Long> countsByLevel) throws IOException { Long size = sizesByLevel.get(ib.getLevel()); if (size == null) size = 0l; - + Long count = countsByLevel.get(ib.getLevel()); if (count == null) count = 0l; - + size += ib.index.sizeInBytes(); count++; - + sizesByLevel.put(ib.getLevel(), size); countsByLevel.put(ib.getLevel(), count); - + if (ib.getLevel() > 0) { for (IndexEntry ie : ib.index) { IndexBlock cib = getIndexBlock(ie); @@ -774,14 +774,14 @@ public class MultiLevelIndex { } } } - + public void getIndexInfo(Map<Integer,Long> sizes, Map<Integer,Long> counts) throws IOException { getIndexInfo(rootBlock, sizes, counts); } - + public Key getLastKey() { return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey(); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 43586dd..f29efcc 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -39,7 +39,7 @@ import com.beust.jcommander.Parameter; public class PrintInfo { private static final Logger log = Logger.getLogger(PrintInfo.class); - + static class Opts extends Help { @Parameter(names = {"-d", "--dump"}, description = "dump the key/value pairs") boolean dump = false; @@ -48,29 +48,29 @@ public class PrintInfo { @Parameter(description = " <file> { <file> ... }") List<String> files = new ArrayList<String>(); } - + public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); AccumuloConfiguration aconf = SiteConfiguration.getInstance(DefaultConfiguration.getInstance()); // TODO ACCUMULO-2462 This will only work for RFiles (path only, not URI) in HDFS when the correct filesystem for the given file - // is on Property.INSTANCE_DFS_DIR or, when INSTANCE_DFS_DIR is not defined, is on the default filesystem + // is on Property.INSTANCE_DFS_DIR or, when INSTANCE_DFS_DIR is not defined, is on the default filesystem // defined in the Hadoop's core-site.xml // // A workaround is to always provide a URI to this class FileSystem hadoopFs = VolumeConfiguration.getDefaultVolume(conf, aconf).getFileSystem(); - FileSystem localFs = FileSystem.getLocal(conf); + FileSystem localFs = FileSystem.getLocal(conf); Opts opts = new Opts(); opts.parseArgs(PrintInfo.class.getName(), args); if (opts.files.isEmpty()) { System.err.println("No files were given"); System.exit(-1); } - + long countBuckets[] = new long[11]; long sizeBuckets[] = new long[countBuckets.length]; long totalSize = 0; - + for (String arg : opts.files) { Path path = new Path(arg); FileSystem fs; @@ -81,14 +81,14 @@ public class PrintInfo { log.warn("Attempting to find file across filesystems. Consider providing URI instead of path"); fs = hadoopFs.exists(path) ? hadoopFs : localFs; // fall back to local } - + CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf); Reader iter = new RFile.Reader(_rdr); - + iter.printInfo(); System.out.println(); org.apache.accumulo.core.file.rfile.bcfile.PrintInfo.main(new String[] {arg}); - + if (opts.histogram || opts.dump) { iter.seek(new Range((Key) null, (Key) null), new ArrayList<ByteSequence>(), false); while (iter.hasTop()) { @@ -113,7 +113,7 @@ public class PrintInfo { System.out.println(String.format("%11.0f : %10d %6.2f%%", Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize)); } } - + // If the output stream has closed, there is no reason to keep going. if (System.out.checkError()) return; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 9dcb3a5..0b464d8 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -68,47 +68,47 @@ import org.apache.hadoop.io.Writable; import org.apache.log4j.Logger; public class RFile { - + public static final String EXTENSION = "rf"; - + private static final Logger log = Logger.getLogger(RFile.class); - + private RFile() {} - + private static final int RINDEX_MAGIC = 0x20637474; static final int RINDEX_VER_7 = 7; static final int RINDEX_VER_6 = 6; // static final int RINDEX_VER_5 = 5; // unreleased static final int RINDEX_VER_4 = 4; static final int RINDEX_VER_3 = 3; - + private static class LocalityGroupMetadata implements Writable { - + private int startBlock; private Key firstKey; private Map<ByteSequence,MutableLong> columnFamilies; - + private boolean isDefaultLG = false; private String name; private Set<ByteSequence> previousColumnFamilies; - + private MultiLevelIndex.BufferedWriter indexWriter; private MultiLevelIndex.Reader indexReader; - + public LocalityGroupMetadata(int version, BlockFileReader br) { columnFamilies = new HashMap<ByteSequence,MutableLong>(); indexReader = new MultiLevelIndex.Reader(br, version); } - + public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) { this.startBlock = nextBlock; isDefaultLG = true; columnFamilies = new HashMap<ByteSequence,MutableLong>(); previousColumnFamilies = pcf; - + indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize)); } - + public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) { this.startBlock = nextBlock; this.name = name; @@ -117,22 +117,22 @@ public class RFile { for (ByteSequence cf : cfset) { columnFamilies.put(cf, new MutableLong(0)); } - + indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize)); } - + private Key getFirstKey() { return firstKey; } - + private void setFirstKey(Key key) { if (firstKey != null) throw new IllegalStateException(); this.firstKey = new Key(key); } - + public void updateColumnCount(Key key) { - + if (isDefaultLG && columnFamilies == null) { if (previousColumnFamilies.size() > 0) { // only do this check when there are previous column families @@ -141,23 +141,23 @@ public class RFile { throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group"); } } - + // no longer keeping track of column families, so return return; } - + ByteSequence cf = key.getColumnFamilyData(); MutableLong count = columnFamilies.get(cf); - + if (count == null) { if (!isDefaultLG) { throw new IllegalArgumentException("invalid column family : " + cf); } - + if (previousColumnFamilies.contains(cf)) { throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group"); } - + if (columnFamilies.size() > Writer.MAX_CF_IN_DLG) { // stop keeping track, there are too many columnFamilies = null; @@ -165,86 +165,86 @@ public class RFile { } count = new MutableLong(0); columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count); - + } - + count.increment(); - + } - + @Override public void readFields(DataInput in) throws IOException { - + isDefaultLG = in.readBoolean(); if (!isDefaultLG) { name = in.readUTF(); } - + startBlock = in.readInt(); - + int size = in.readInt(); - + if (size == -1) { if (!isDefaultLG) throw new IllegalStateException("Non default LG " + name + " does not have column families"); - + columnFamilies = null; } else { if (columnFamilies == null) columnFamilies = new HashMap<ByteSequence,MutableLong>(); else columnFamilies.clear(); - + for (int i = 0; i < size; i++) { int len = in.readInt(); byte cf[] = new byte[len]; in.readFully(cf); long count = in.readLong(); - + columnFamilies.put(new ArrayByteSequence(cf), new MutableLong(count)); } } - + if (in.readBoolean()) { firstKey = new Key(); firstKey.readFields(in); } else { firstKey = null; } - + indexReader.readFields(in); } - + @Override public void write(DataOutput out) throws IOException { - + out.writeBoolean(isDefaultLG); if (!isDefaultLG) { out.writeUTF(name); } - + out.writeInt(startBlock); - + if (isDefaultLG && columnFamilies == null) { // only expect null when default LG, otherwise let a NPE occur out.writeInt(-1); } else { out.writeInt(columnFamilies.size()); - + for (Entry<ByteSequence,MutableLong> entry : columnFamilies.entrySet()) { out.writeInt(entry.getKey().length()); out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length()); out.writeLong(entry.getValue().longValue()); } } - + out.writeBoolean(firstKey != null); if (firstKey != null) firstKey.write(out); - + indexWriter.close(out); } - + public void printInfo() throws IOException { PrintStream out = System.out; out.println("Locality group : " + (isDefaultLG ? "<DEFAULT>" : name)); @@ -258,55 +258,55 @@ public class RFile { + String.format("%,d bytes %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey()))); } out.println("\tFirst key : " + firstKey); - + Key lastKey = null; if (indexReader.size() > 0) { lastKey = indexReader.getLastKey(); } - + out.println("\tLast key : " + lastKey); - + long numKeys = 0; IndexIterator countIter = indexReader.lookup(new Key()); while (countIter.hasNext()) { numKeys += countIter.next().getNumEntries(); } - + out.println("\tNum entries : " + String.format("%,d", numKeys)); out.println("\tColumn families : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet())); } - + } - + public static class Writer implements FileSKVWriter { - + public static final int MAX_CF_IN_DLG = 1000; - + private BlockFileWriter fileWriter; private ABlockWriter blockWriter; - + // private BlockAppender blockAppender; private long blockSize = 100000; private int indexBlockSize; private int entries = 0; - + private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>(); private LocalityGroupMetadata currentLocalityGroup = null; private int nextBlock = 0; - + private Key lastKeyInBlock = null; - + private boolean dataClosed = false; private boolean closed = false; private Key prevKey = new Key(); private boolean startedDefaultLocalityGroup = false; - + private HashSet<ByteSequence> previousColumnFamilies; - + public Writer(BlockFileWriter bfw, int blockSize) throws IOException { this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)); } - + public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException { this.blockSize = blockSize; this.indexBlockSize = indexBlockSize; @@ -314,123 +314,123 @@ public class RFile { this.blockWriter = null; previousColumnFamilies = new HashSet<ByteSequence>(); } - + @Override public synchronized void close() throws IOException { - + if (closed) { return; } - + closeData(); - + ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index"); - + mba.writeInt(RINDEX_MAGIC); mba.writeInt(RINDEX_VER_7); - + if (currentLocalityGroup != null) localityGroups.add(currentLocalityGroup); - + mba.writeInt(localityGroups.size()); - + for (LocalityGroupMetadata lc : localityGroups) { lc.write(mba); } - + mba.close(); - + fileWriter.close(); - + closed = true; } - + private void closeData() throws IOException { - + if (dataClosed) { return; } - + dataClosed = true; - + if (blockWriter != null) { closeBlock(lastKeyInBlock, true); } } - + @Override public void append(Key key, Value value) throws IOException { - + if (dataClosed) { throw new IllegalStateException("Cannont append, data closed"); } - + if (key.compareTo(prevKey) < 0) { throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey); } - + currentLocalityGroup.updateColumnCount(key); - + if (currentLocalityGroup.getFirstKey() == null) { currentLocalityGroup.setFirstKey(key); } - + if (blockWriter == null) { blockWriter = fileWriter.prepareDataBlock(); } else if (blockWriter.getRawSize() > blockSize) { closeBlock(prevKey, false); blockWriter = fileWriter.prepareDataBlock(); } - + RelativeKey rk = new RelativeKey(lastKeyInBlock, key); - + rk.write(blockWriter); value.write(blockWriter); entries++; - + prevKey = new Key(key); lastKeyInBlock = prevKey; - + } - + private void closeBlock(Key key, boolean lastBlock) throws IOException { blockWriter.close(); - + if (lastBlock) currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize()); else currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize()); - + blockWriter = null; lastKeyInBlock = null; entries = 0; nextBlock++; } - + @Override public DataOutputStream createMetaStore(String name) throws IOException { closeData(); - + return (DataOutputStream) fileWriter.prepareMetaBlock(name); } - + private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException { if (dataClosed) { throw new IllegalStateException("data closed"); } - + if (startedDefaultLocalityGroup) { throw new IllegalStateException("Can not start anymore new locality groups after default locality group started"); } - + if (blockWriter != null) { closeBlock(lastKeyInBlock, true); } - + if (currentLocalityGroup != null) { localityGroups.add(currentLocalityGroup); } - + if (columnFamilies == null) { startedDefaultLocalityGroup = true; currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter); @@ -443,31 +443,31 @@ public class RFile { currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter); previousColumnFamilies.addAll(columnFamilies); } - + prevKey = new Key(); } - + @Override public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException { if (columnFamilies == null) throw new NullPointerException(); - + _startNewLocalityGroup(name, columnFamilies); } - + @Override public void startDefaultLocalityGroup() throws IOException { _startNewLocalityGroup(null, null); } - + @Override public boolean supportsLocalityGroups() { return true; } } - + private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator { - + private BlockFileReader reader; private MultiLevelIndex.Reader index; private int blockCount; @@ -476,7 +476,7 @@ public class RFile { private boolean closed = false; private int version; private boolean checkRange = true; - + private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException { super(lgm.columnFamilies, lgm.isDefaultLG); this.firstKey = lgm.firstKey; @@ -484,11 +484,11 @@ public class RFile { this.startBlock = lgm.startBlock; blockCount = index.size(); this.version = version; - + this.reader = reader; - + } - + public LocalityGroupReader(LocalityGroupReader lgr) { super(lgr.columnFamilies, lgr.isDefaultLocalityGroup); this.firstKey = lgr.firstKey; @@ -498,20 +498,20 @@ public class RFile { this.reader = lgr.reader; this.version = lgr.version; } - + Iterator<IndexEntry> getIndex() throws IOException { return index.lookup(new Key()); } - + @Override public void close() throws IOException { closed = true; hasTop = false; if (currBlock != null) currBlock.close(); - + } - + private IndexIterator iiter; private int entriesLeft; private ABlockReader currBlock; @@ -521,22 +521,22 @@ public class RFile { private Range range = null; private boolean hasTop = false; private AtomicBoolean interruptFlag; - + @Override public Key getTopKey() { return rk.getKey(); } - + @Override public Value getTopValue() { return val; } - + @Override public boolean hasTop() { return hasTop; } - + @Override public void next() throws IOException { try { @@ -546,20 +546,20 @@ public class RFile { throw ioe; } } - + private void _next() throws IOException { - + if (!hasTop) throw new IllegalStateException(); - + if (entriesLeft == 0) { currBlock.close(); - + if (iiter.hasNext()) { IndexEntry indexEntry = iiter.next(); entriesLeft = indexEntry.getNumEntries(); currBlock = getDataBlock(indexEntry); - + checkRange = range.afterEndKey(indexEntry.getKey()); if (!checkRange) hasTop = true; @@ -571,7 +571,7 @@ public class RFile { return; } } - + prevKey = rk.getKey(); rk.readFields(currBlock); val.readFields(currBlock); @@ -579,30 +579,30 @@ public class RFile { if (checkRange) hasTop = !range.afterEndKey(rk.getKey()); } - + private ABlockReader getDataBlock(IndexEntry indexEntry) throws IOException { if (interruptFlag != null && interruptFlag.get()) throw new IterationInterruptedException(); - + if (version == RINDEX_VER_3 || version == RINDEX_VER_4) return reader.getDataBlock(startBlock + iiter.previousIndex()); else return reader.getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(), indexEntry.getRawSize()); - + } - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - + if (closed) throw new IllegalStateException("Locality group reader closed"); - + if (columnFamilies.size() != 0 || inclusive) throw new IllegalArgumentException("I do not know how to filter column families"); - + if (interruptFlag != null && interruptFlag.get()) throw new IterationInterruptedException(); - + try { _seek(range); } catch (IOException ioe) { @@ -610,7 +610,7 @@ public class RFile { throw ioe; } } - + private void reset() { rk = null; hasTop = false; @@ -626,45 +626,45 @@ public class RFile { } } } - + private void _seek(Range range) throws IOException { - + this.range = range; this.checkRange = true; - + if (blockCount == 0) { // its an empty file rk = null; return; } - + Key startKey = range.getStartKey(); if (startKey == null) startKey = new Key(); - + boolean reseek = true; - + if (range.afterEndKey(firstKey)) { // range is before first key in rfile, so there is nothing to do reset(); reseek = false; } - + if (rk != null) { if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) { // range is between the two keys in the file where the last range seeked to stopped, so there is // nothing to do reseek = false; } - + if (startKey.compareTo(getTopKey()) <= 0 && startKey.compareTo(prevKey) > 0) { // current location in file can satisfy this request, no need to seek reseek = false; } - + if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) { // start key is within the unconsumed portion of the current block - + // this code intentionally does not use the index associated with a cached block // because if only forward seeks are being done, then there is no benefit to building // and index for the block... could consider using the index if it exist but not @@ -679,37 +679,37 @@ public class RFile { prevKey = skippr.prevKey; rk = skippr.rk; } - + reseek = false; } - + if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && startKey.compareTo(firstKey) <= 0) { // seeking before the beginning of the file, and already positioned at the first key in the file // so there is nothing to do reseek = false; } } - + if (reseek) { iiter = index.lookup(startKey); - + reset(); - + if (!iiter.hasNext()) { // past the last key } else { - + // if the index contains the same key multiple times, then go to the // earliest index entry containing the key while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) { iiter.previous(); } - + if (iiter.hasPrevious()) prevKey = new Key(iiter.peekPrevious().getKey()); // initially prevKey is the last key of the prev block else prevKey = new Key(); // first block in the file, so set prev key to minimal key - + IndexEntry indexEntry = iiter.next(); entriesLeft = indexEntry.getNumEntries(); currBlock = getDataBlock(indexEntry); @@ -736,7 +736,7 @@ public class RFile { val.readFields(currBlock); valbs = new MutableByteSequence(val.get(), 0, val.getSize()); - + // just consumed one key from the input stream, so subtract one from entries left entriesLeft = bie.getEntriesLeft() - 1; prevKey = new Key(bie.getPrevKey()); @@ -754,76 +754,76 @@ public class RFile { rk = skippr.rk; } } - + hasTop = rk != null && !range.afterEndKey(rk.getKey()); - + while (hasTop() && range.beforeStartKey(getTopKey())) { next(); } } - + @Override public Key getFirstKey() throws IOException { return firstKey; } - + @Override public Key getLastKey() throws IOException { if (index.size() == 0) return null; return index.getLastKey(); } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); } - + @Override public void closeDeepCopies() throws IOException { throw new UnsupportedOperationException(); } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } - + @Override public DataInputStream getMetaStore(String name) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void setInterruptFlag(AtomicBoolean flag) { this.interruptFlag = flag; } - + @Override public InterruptibleIterator getIterator() { return this; } } - + public static class Reader extends HeapIterator implements FileSKVIterator { private BlockFileReader reader; - + private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>(); - + private LocalityGroupReader lgReaders[]; private HashSet<ByteSequence> nonDefaultColumnFamilies; - + private List<Reader> deepCopies; private boolean deepCopy = false; - + private AtomicBoolean interruptFlag; - + public Reader(BlockFileReader rdr) throws IOException { this.reader = rdr; - + ABlockReader mb = reader.getMetaBlock("RFile.index"); - try{ + try { int magic = mb.readInt(); int ver = mb.readInt(); @@ -847,16 +847,16 @@ public class RFile { } finally { mb.close(); } - + nonDefaultColumnFamilies = new HashSet<ByteSequence>(); for (LocalityGroupMetadata lgm : localityGroups) { if (!lgm.isDefaultLG) nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet()); } - + createHeap(lgReaders.length); } - + private Reader(Reader r) { super(r.lgReaders.length); this.reader = r.reader; @@ -869,7 +869,7 @@ public class RFile { this.lgReaders[i].setInterruptFlag(r.interruptFlag); } } - + private void closeLocalityGroupReaders() { for (LocalityGroupReader lgr : lgReaders) { try { @@ -879,26 +879,26 @@ public class RFile { } } } - + @Override public void closeDeepCopies() { if (deepCopy) throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported"); - + for (Reader deepCopy : deepCopies) deepCopy.closeLocalityGroupReaders(); - + deepCopies.clear(); } - + @Override public void close() throws IOException { if (deepCopy) throw new RuntimeException("Calling close on a deep copy is not supported"); - + closeDeepCopies(); closeLocalityGroupReaders(); - + try { reader.close(); } finally { @@ -907,15 +907,15 @@ public class RFile { */ } } - + @Override public Key getFirstKey() throws IOException { if (lgReaders.length == 0) { return null; } - + Key minKey = null; - + for (int i = 0; i < lgReaders.length; i++) { if (minKey == null) { minKey = lgReaders[i].getFirstKey(); @@ -925,18 +925,18 @@ public class RFile { minKey = firstKey; } } - + return minKey; } - + @Override public Key getLastKey() throws IOException { if (lgReaders.length == 0) { return null; } - + Key maxKey = null; - + for (int i = 0; i < lgReaders.length; i++) { if (maxKey == null) { maxKey = lgReaders[i].getLastKey(); @@ -946,10 +946,10 @@ public class RFile { maxKey = lastKey; } } - + return maxKey; } - + @Override public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException { try { @@ -958,7 +958,7 @@ public class RFile { throw new NoSuchMetaStoreException("name = " + name, e); } } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { Reader copy = new Reader(this); @@ -966,53 +966,53 @@ public class RFile { deepCopies.add(copy); return copy; } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); - + } - + private int numLGSeeked = 0; - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive); } - + int getNumLocalityGroupsSeeked() { return numLGSeeked; } - + public FileSKVIterator getIndex() throws IOException { - + ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>(); - + for (LocalityGroupReader lgr : lgReaders) { indexes.add(lgr.getIndex()); } - + return new MultiIndexIterator(this, indexes); } - + public void printInfo() throws IOException { for (LocalityGroupMetadata lgm : localityGroups) { lgm.printInfo(); } - + } - + @Override public void setInterruptFlag(AtomicBoolean flag) { if (deepCopy) throw new RuntimeException("Calling setInterruptFlag on a deep copy is not supported"); - + if (deepCopies.size() != 0) throw new RuntimeException("Setting interrupt flag after calling deep copy not supported"); - + setInterruptFlagInternal(flag); } - + private void setInterruptFlagInternal(AtomicBoolean flag) { this.interruptFlag = flag; for (LocalityGroupReader lgr : lgReaders) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 9fabe42..088abfe 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -38,20 +38,20 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class RFileOperations extends FileOperations { - + private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet(); - + @Override public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return fs.getFileStatus(new Path(file)).getLen(); } - + @Override public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - + return openIndex(file, fs, conf, acuconf, null, null); } - + @Override public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException { @@ -61,30 +61,30 @@ public class RFileOperations extends FileOperations { // Reader reader = new RFile.Reader(in, len , conf); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf); final Reader reader = new RFile.Reader(_cbr); - + return reader.getIndex(); } - + @Override public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return openReader(file, seekToBeginning, fs, conf, acuconf, null, null); } - + @Override public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException { Path path = new Path(file); - + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf); Reader iter = new RFile.Reader(_cbr); - + if (seekToBeginning) { iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false); } - + return iter; } - + @Override public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf) throws IOException { @@ -92,7 +92,7 @@ public class RFileOperations extends FileOperations { iter.seek(range, columnFamilies, inclusive); return iter; } - + @Override public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException { @@ -100,7 +100,7 @@ public class RFileOperations extends FileOperations { iter.seek(range, columnFamilies, inclusive); return iter; } - + @Override public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); @@ -119,10 +119,10 @@ public class RFileOperations extends FileOperations { if (tblock > 0) block = tblock; int bufferSize = conf.getInt("io.file.buffer.size", 4096); - + long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE); long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX); - + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf); Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize); return writer;