http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 7378348..47936b6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -71,13 +71,13 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; class MemKeyComparator implements Comparator<Key>, Serializable { - + private static final long serialVersionUID = 1L; @Override public int compare(Key k1, Key k2) { int cmp = k1.compareTo(k2); - + if (cmp == 0) { if (k1 instanceof MemKey) if (k2 instanceof MemKey) @@ -87,36 +87,36 @@ class MemKeyComparator implements Comparator<Key>, Serializable { else if (k2 instanceof MemKey) cmp = -1; } - + return cmp; } } class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator { - + int kvCount; - + public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) { setSource(source); this.kvCount = maxKVCount; } - + @Override protected void consume() throws IOException { while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount) getSource().next(); } - + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount); } - + @Override public void setInterruptFlag(AtomicBoolean flag) { ((InterruptibleIterator) getSource()).setInterruptFlag(flag); } - + } class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator { @@ -132,17 +132,17 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { return new MemKeyConversionIterator(getSource().deepCopy(env)); } - + @Override public Key getTopKey() { return currKey; } - + @Override public Value getTopValue() { return currVal; } - + private void getTopKeyVal() { Key k = super.getTopKey(); Value v = super.getTopValue(); @@ -156,7 +156,7 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible currKey = new MemKey(k, mc); } - + public void next() throws IOException { super.next(); if (hasTop()) @@ -165,7 +165,7 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { super.seek(range, columnFamilies, inclusive); - + if (hasTop()) getTopKeyVal(); @@ -185,14 +185,14 @@ class MemKeyConversionIterator extends WrappingIterator implements Interruptible public class InMemoryMap { private SimpleMap map = null; - + private static final Logger log = Logger.getLogger(InMemoryMap.class); - + private volatile String memDumpFile = null; private final String memDumpDir; private Map<String,Set<ByteSequence>> lggroups; - + public InMemoryMap(boolean useNativeMap, String memDumpDir) { this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir); } @@ -200,17 +200,17 @@ public class InMemoryMap { public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) { this.memDumpDir = memDumpDir; this.lggroups = lggroups; - + if (lggroups.size() == 0) map = newMap(useNativeMap); else map = new LocalityGroupMap(lggroups, useNativeMap); } - + public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError { this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR)); } - + private static SimpleMap newMap(boolean useNativeMap) { if (useNativeMap && NativeMap.isLoaded()) { try { @@ -219,43 +219,43 @@ public class InMemoryMap { log.error("Failed to create native map", t); } } - + return new DefaultMap(); } - + private interface SimpleMap { Value get(Key key); - + Iterator<Entry<Key,Value>> iterator(Key startKey); - + int size(); - + InterruptibleIterator skvIterator(); - + void delete(); - + long getMemoryUsed(); - + void mutate(List<Mutation> mutations, int kvCount); } - + private static class LocalityGroupMap implements SimpleMap { - + private Map<ByteSequence,MutableLong> groupFams[]; - + // the last map in the array is the default locality group private SimpleMap maps[]; private Partitioner partitioner; private List<Mutation>[] partitioned; private Set<ByteSequence> nonDefaultColumnFamilies; - + @SuppressWarnings("unchecked") LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) { this.groupFams = new Map[groups.size()]; this.maps = new SimpleMap[groups.size() + 1]; this.partitioned = new List[groups.size() + 1]; this.nonDefaultColumnFamilies = new HashSet<ByteSequence>(); - + for (int i = 0; i < maps.length; i++) { maps[i] = newMap(useNativeMap); } @@ -268,9 +268,9 @@ public class InMemoryMap { this.groupFams[count++] = map; nonDefaultColumnFamilies.addAll(cfset); } - + partitioner = new LocalityGroupUtil.Partitioner(this.groupFams); - + for (int i = 0; i < partitioned.length; i++) { partitioned[i] = new ArrayList<Mutation>(); } @@ -280,12 +280,12 @@ public class InMemoryMap { public Value get(Key key) { throw new UnsupportedOperationException(); } - + @Override public Iterator<Entry<Key,Value>> iterator(Key startKey) { throw new UnsupportedOperationException(); } - + @Override public int size() { int sum = 0; @@ -293,7 +293,7 @@ public class InMemoryMap { sum += map.size(); return sum; } - + @Override public InterruptibleIterator skvIterator() { LocalityGroup groups[] = new LocalityGroup[maps.length]; @@ -304,16 +304,15 @@ public class InMemoryMap { groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true); } - return new LocalityGroupIterator(groups, nonDefaultColumnFamilies); } - + @Override public void delete() { for (SimpleMap map : maps) map.delete(); } - + @Override public long getMemoryUsed() { long sum = 0; @@ -321,16 +320,16 @@ public class InMemoryMap { sum += map.getMemoryUsed(); return sum; } - + @Override public synchronized void mutate(List<Mutation> mutations, int kvCount) { // this method is synchronized because it reuses objects to avoid allocation, // currently, the method that calls this is synchronized so there is no // loss in parallelism.... synchronization was added here for future proofing - - try{ + + try { partitioner.partition(mutations, partitioned); - + for (int i = 0; i < partitioned.length; i++) { if (partitioned[i].size() > 0) { maps[i].mutate(partitioned[i], kvCount); @@ -345,14 +344,14 @@ public class InMemoryMap { } } } - + } private static class DefaultMap implements SimpleMap { private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator()); private AtomicLong bytesInMemory = new AtomicLong(); private AtomicInteger size = new AtomicInteger(); - + public void put(Key key, Value value) { // Always a MemKey, so account for the kvCount int bytesInMemory.addAndGet(key.getLength() + 4); @@ -360,42 +359,42 @@ public class InMemoryMap { if (map.put(key, value) == null) size.incrementAndGet(); } - + public Value get(Key key) { return map.get(key); } - + public Iterator<Entry<Key,Value>> iterator(Key startKey) { Key lk = new Key(startKey); SortedMap<Key,Value> tm = map.tailMap(lk); return tm.entrySet().iterator(); } - + public int size() { return size.get(); } - + public synchronized InterruptibleIterator skvIterator() { if (map == null) throw new IllegalStateException(); - + return new SortedMapIterator(map); } - + public synchronized void delete() { map = null; } - + public long getOverheadPerEntry() { // all of the java objects that are used to hold the // data and make it searchable have overhead... this // overhead is estimated using test.EstimateInMemMapOverhead // and is in bytes.. the estimates were obtained by running // java 6_16 in 64 bit server mode - + return 200; } - + @Override public void mutate(List<Mutation> mutations, int kvCount) { for (Mutation m : mutations) { @@ -407,64 +406,64 @@ public class InMemoryMap { } } } - + @Override public long getMemoryUsed() { return bytesInMemory.get() + (size() * getOverheadPerEntry()); } } - + private static class NativeMapWrapper implements SimpleMap { private NativeMap nativeMap; - + NativeMapWrapper() { nativeMap = new NativeMap(); } - + public Value get(Key key) { return nativeMap.get(key); } - + public Iterator<Entry<Key,Value>> iterator(Key startKey) { return nativeMap.iterator(startKey); } - + public int size() { return nativeMap.size(); } - + public InterruptibleIterator skvIterator() { return (InterruptibleIterator) nativeMap.skvIterator(); } - + public void delete() { nativeMap.delete(); } - + public long getMemoryUsed() { return nativeMap.getMemoryUsed(); } - + @Override public void mutate(List<Mutation> mutations, int kvCount) { nativeMap.mutate(mutations, kvCount); } } - + private AtomicInteger nextKVCount = new AtomicInteger(1); private AtomicInteger kvCount = new AtomicInteger(0); private Object writeSerializer = new Object(); - + /** * Applies changes to a row in the InMemoryMap - * + * */ public void mutate(List<Mutation> mutations) { int numKVs = 0; for (int i = 0; i < mutations.size(); i++) numKVs += mutations.get(i).size(); - + // Can not update mutationCount while writes that started before // are in progress, this would cause partial mutations to be seen. // Also, can not continue until mutation count is updated, because @@ -472,7 +471,7 @@ public class InMemoryMap { // wait for writes that started before to finish. // // using separate lock from this map, to allow read/write in parallel - synchronized (writeSerializer ) { + synchronized (writeSerializer) { int kv = nextKVCount.getAndAdd(numKVs); try { map.mutate(mutations, kv); @@ -481,51 +480,51 @@ public class InMemoryMap { } } } - + /** * Returns a long representing the size of the InMemoryMap - * + * * @return bytesInMemory */ public synchronized long estimatedSizeInBytes() { if (map == null) return 0; - + return map.getMemoryUsed(); } - + Iterator<Map.Entry<Key,Value>> iterator(Key startKey) { return map.iterator(startKey); } - + public synchronized long getNumEntries() { if (map == null) return 0; return map.size(); } - + private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>()); - + class MemoryDataSource implements DataSource { - + boolean switched = false; private InterruptibleIterator iter; private FileSKVIterator reader; private MemoryDataSource parent; private IteratorEnvironment env; private AtomicBoolean iflag; - + MemoryDataSource() { this(null, false, null, null); } - + public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env, AtomicBoolean iflag) { this.parent = parent; this.switched = switched; this.env = env; this.iflag = iflag; } - + @Override public boolean isCurrent() { if (switched) @@ -533,12 +532,12 @@ public class InMemoryMap { else return memDumpFile == null; } - + @Override public DataSource getNewDataSource() { if (switched) throw new IllegalStateException(); - + if (!isCurrent()) { switched = true; iter = null; @@ -549,15 +548,15 @@ public class InMemoryMap { throw new RuntimeException(); } } - + return this; } - + private synchronized FileSKVIterator getReader() throws IOException { if (reader == null) { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = FileSystem.getLocal(conf); - + reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance()); if (iflag != null) reader.setInterruptFlag(iflag); @@ -583,10 +582,10 @@ public class InMemoryMap { iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env)); } } - + return iter; } - + @Override public DataSource getDeepCopyDataSource(IteratorEnvironment env) { return new MemoryDataSource(parent == null ? this : parent, switched, env, iflag); @@ -596,36 +595,36 @@ public class InMemoryMap { public void setInterruptFlag(AtomicBoolean flag) { this.iflag = flag; } - + } - + public class MemoryIterator extends WrappingIterator implements InterruptibleIterator { - + private AtomicBoolean closed; private SourceSwitchingIterator ssi; private MemoryDataSource mds; - + protected SortedKeyValueIterator<Key,Value> getSource() { if (closed.get()) throw new IllegalStateException("Memory iterator is closed"); return super.getSource(); } - + private MemoryIterator(InterruptibleIterator source) { this(source, new AtomicBoolean(false)); } - + private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) { setSource(source); this.closed = closed; } - + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { return new MemoryIterator(getSource().deepCopy(env), closed); } - + public void close() { - + synchronized (this) { if (closed.compareAndSet(false, true)) { try { @@ -636,41 +635,41 @@ public class InMemoryMap { } } } - + // remove outside of sync to avoid deadlock activeIters.remove(this); } - + private synchronized boolean switchNow() throws IOException { if (closed.get()) return false; - + ssi.switchNow(); return true; } - + @Override public void setInterruptFlag(AtomicBoolean flag) { ((InterruptibleIterator) getSource()).setInterruptFlag(flag); } - + private void setSSI(SourceSwitchingIterator ssi) { this.ssi = ssi; } - + public void setMDS(MemoryDataSource mds) { this.mds = mds; } - + } - + public synchronized MemoryIterator skvIterator() { if (map == null) throw new NullPointerException(); - + if (deleted) throw new IllegalStateException("Can not obtain iterator after map deleted"); - + int mc = kvCount.get(); MemoryDataSource mds = new MemoryDataSource(); SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource()); @@ -680,94 +679,93 @@ public class InMemoryMap { activeIters.add(mi); return mi; } - + public SortedKeyValueIterator<Key,Value> compactionIterator() { - + if (nextKVCount.get() - 1 != kvCount.get()) - throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " - + kvCount.get()); - + throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + kvCount.get()); + return map.skvIterator(); } - + private boolean deleted = false; - + public void delete(long waitTime) { - + synchronized (this) { if (deleted) throw new IllegalStateException("Double delete"); - + deleted = true; } - + long t1 = System.currentTimeMillis(); - + while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) { UtilWaitThread.sleep(50); } - + if (activeIters.size() > 0) { // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file try { Configuration conf = CachedConfiguration.getInstance(); FileSystem fs = FileSystem.getLocal(conf); - + String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION; - + Configuration newConf = new Configuration(conf); newConf.setInt("io.seqfile.compress.blocksize", 100000); - + FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, SiteConfiguration.getInstance()); - + InterruptibleIterator iter = map.skvIterator(); - - HashSet<ByteSequence> allfams= new HashSet<ByteSequence>(); - - for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){ + + HashSet<ByteSequence> allfams = new HashSet<ByteSequence>(); + + for (Entry<String,Set<ByteSequence>> entry : lggroups.entrySet()) { allfams.addAll(entry.getValue()); out.startNewLocalityGroup(entry.getKey(), entry.getValue()); iter.seek(new Range(), entry.getValue(), true); dumpLocalityGroup(out, iter); } - + out.startDefaultLocalityGroup(); iter.seek(new Range(), allfams, false); - + dumpLocalityGroup(out, iter); - + out.close(); - + log.debug("Created mem dump file " + tmpFile); - + memDumpFile = tmpFile; - + synchronized (activeIters) { for (MemoryIterator mi : activeIters) { mi.switchNow(); } } - + // rely on unix behavior that file will be deleted when last // reader closes it fs.delete(new Path(memDumpFile), true); - + } catch (IOException ioe) { log.error("Failed to create mem dump file ", ioe); - + while (activeIters.size() > 0) { UtilWaitThread.sleep(100); } } - + } - + SimpleMap tmpMap = map; - + synchronized (this) { map = null; } - + tmpMap.delete(); }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java index 4bc8891..443ffb2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemKey.java @@ -23,55 +23,55 @@ import java.io.IOException; import org.apache.accumulo.core.data.Key; class MemKey extends Key { - + int kvCount; - + public MemKey(byte[] row, byte[] cf, byte[] cq, byte[] cv, long ts, boolean del, boolean copy, int mc) { super(row, cf, cq, cv, ts, del, copy); this.kvCount = mc; } - + public MemKey() { super(); this.kvCount = Integer.MAX_VALUE; } - + public MemKey(Key key, int mc) { super(key); this.kvCount = mc; } - + public String toString() { return super.toString() + " mc=" + kvCount; } - + @Override public Object clone() throws CloneNotSupportedException { return super.clone(); } - + @Override public void write(DataOutput out) throws IOException { super.write(out); out.writeInt(kvCount); } - + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); kvCount = in.readInt(); } - + @Override public int compareTo(Key k) { - + int cmp = super.compareTo(k); - + if (cmp == 0 && k instanceof MemKey) { cmp = ((MemKey) k).kvCount - kvCount; } - + return cmp; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java index f1fdde4..0ce3b9e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MemValue.java @@ -22,12 +22,12 @@ import java.io.IOException; import org.apache.accumulo.core.data.Value; /** - * + * */ public class MemValue extends Value { int kvCount; boolean merged = false; - + /** * @param value * Value @@ -38,17 +38,17 @@ public class MemValue extends Value { super(value); this.kvCount = kv; } - + public MemValue() { super(); this.kvCount = Integer.MAX_VALUE; } - + public MemValue(Value value, int kv) { super(value); this.kvCount = kv; } - + // Override @Override public void write(final DataOutput out) throws IOException { @@ -64,7 +64,7 @@ public class MemValue extends Value { } super.write(out); } - + @Override public void set(final byte[] b) { super.set(b); @@ -76,16 +76,16 @@ public class MemValue extends Value { super.copy(b); merged = false; } - + /** * Takes a Value and will take out the embedded kvCount, and then return that value while replacing the Value with the original unembedded version - * + * * @return The kvCount embedded in v. */ public static int splitKVCount(Value v) { if (v instanceof MemValue) return ((MemValue) v).kvCount; - + byte[] originalBytes = new byte[v.getSize() - 4]; byte[] combined = v.get(); System.arraycopy(combined, 4, originalBytes, 0, originalBytes.length); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java index 82c791c..62c6ec6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/MinorCompactionReason.java @@ -18,4 +18,4 @@ package org.apache.accumulo.tserver; public enum MinorCompactionReason { USER, SYSTEM, CLOSE, RECOVERY -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java index 5ee1952..76061e6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java @@ -18,8 +18,8 @@ package org.apache.accumulo.tserver; import java.util.List; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.data.Mutation; public class Mutations { private final Durability durability; @@ -29,10 +29,12 @@ public class Mutations { this.durability = durability; this.mutations = mutations; } + public Durability getDurability() { return durability; } + public List<Mutation> getMutations() { return mutations; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java index e22a54f..b330d1e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java @@ -48,11 +48,11 @@ import org.apache.log4j.Logger; /** * This class stores data in a C++ map. Doing this allows us to store more in memory and avoid pauses caused by Java GC. - * + * * The strategy for dealing with native memory allocated for the native map is that java code using the native map should call delete() as soon as it is * finished using the native map. When the NativeMap object is garbage collected its native resources will be released if needed. However waiting for java GC * would be a mistake for long lived NativeMaps. Long lived objects are not garbage collected quickly, therefore a process could easily use too much memory. - * + * */ public class NativeMap implements Iterable<Map.Entry<Key,Value>> { @@ -92,7 +92,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> { * If native libraries are not loaded, the specified search path will be used to attempt to load them. Directories will be searched by using the * system-specific library naming conventions. A path directly to a file can also be provided. Loading will continue until the search path is exhausted, or * until the native libraries are found and successfully loaded, whichever occurs first. - * + * * @param searchPath * a list of files and directories to search */ @@ -116,7 +116,7 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> { /** * Check if native libraries are loaded. - * + * * @return true if they are loaded; false otherwise */ public static boolean isLoaded() { @@ -360,10 +360,10 @@ public class NativeMap implements Iterable<Map.Entry<Key,Value>> { /** * The strategy for dealing with native memory allocated for iterators is to simply delete that memory when this Java Object is garbage collected. - * + * * These iterators are likely short lived object and therefore will be quickly garbage collected. Even if the objects are long lived and therefore more * slowly garbage collected they only hold a small amount of native memory. - * + * */ private long nmiPointer; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java index 1b22f05..babd629 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/RowLocks.java @@ -30,60 +30,60 @@ import org.apache.accumulo.tserver.ConditionalMutationSet.DeferFilter; import org.apache.accumulo.tserver.data.ServerConditionalMutation; /** - * + * */ class RowLocks { - + private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>(); - + static class RowLock { ReentrantLock rlock; int count; ByteSequence rowSeq; - + RowLock(ReentrantLock rlock, ByteSequence rowSeq) { this.rlock = rlock; this.count = 0; this.rowSeq = rowSeq; } - + public boolean tryLock() { return rlock.tryLock(); } - + public void lock() { rlock.lock(); } - + public void unlock() { rlock.unlock(); } } - + private RowLock getRowLock(ArrayByteSequence rowSeq) { - RowLock lock = rowLocks.get(rowSeq); - if (lock == null) { - lock = new RowLock(new ReentrantLock(), rowSeq); - rowLocks.put(rowSeq, lock); - } - - lock.count++; - return lock; + RowLock lock = rowLocks.get(rowSeq); + if (lock == null) { + lock = new RowLock(new ReentrantLock(), rowSeq); + rowLocks.put(rowSeq, lock); + } + + lock.count++; + return lock; } - + private void returnRowLock(RowLock lock) { - if (lock.count == 0) - throw new IllegalStateException(); - lock.count--; - - if (lock.count == 0) { - rowLocks.remove(lock.rowSeq); - } + if (lock.count == 0) + throw new IllegalStateException(); + lock.count--; + + if (lock.count == 0) { + rowLocks.remove(lock.rowSeq); + } } - + List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) { ArrayList<RowLock> locks = new ArrayList<RowLock>(); - + // assume that mutations are in sorted order to avoid deadlock synchronized (rowLocks) { for (List<ServerConditionalMutation> scml : updates.values()) { @@ -92,7 +92,7 @@ class RowLocks { } } } - + HashSet<ByteSequence> rowsNotLocked = null; // acquire as many locks as possible, not blocking on rows that are already locked @@ -108,9 +108,9 @@ class RowLocks { // if there is only one lock, then wait for it locks.get(0).lock(); } - + if (rowsNotLocked != null) { - + final HashSet<ByteSequence> rnlf = rowsNotLocked; // assume will get locks needed, do something expensive otherwise ConditionalMutationSet.defer(updates, deferred, new DeferFilter() { @@ -121,11 +121,11 @@ class RowLocks { deferred.add(scm); else okMutations.add(scm); - + } } }); - + ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>(); ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>(); for (RowLock rowLock : locks) { @@ -135,7 +135,7 @@ class RowLocks { filteredLocks.add(rowLock); } } - + synchronized (rowLocks) { for (RowLock rowLock : locksToReturn) { returnRowLock(rowLock); @@ -146,12 +146,12 @@ class RowLocks { } return locks; } - + void releaseRowLocks(List<RowLock> locks) { for (RowLock rowLock : locks) { rowLock.unlock(); } - + synchronized (rowLocks) { for (RowLock rowLock : locks) { returnRowLock(rowLock); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java index 83fc43e..5f121cc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TConstraintViolationException.java @@ -51,4 +51,4 @@ public class TConstraintViolationException extends Exception { CommitSession getCommitSession() { return commitSession; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java index 8bdf08b..5705c9e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TLevel.java @@ -19,18 +19,18 @@ package org.apache.accumulo.tserver; import org.apache.log4j.Level; public class TLevel extends Level { - + private static final long serialVersionUID = 1L; public final static Level TABLET_HIST = new TLevel(); - + protected TLevel() { super(Level.DEBUG_INT + 100, "TABLET_HIST", Level.DEBUG_INT + 100); } - + static public Level toLevel(int val) { if (val == Level.DEBUG_INT + 100) return Level.DEBUG; return Level.toLevel(val); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java index d1fece5..e7477b9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java @@ -34,73 +34,73 @@ import org.apache.accumulo.tserver.FileManager.ScanFileManager; import org.apache.hadoop.fs.Path; public class TabletIteratorEnvironment implements IteratorEnvironment { - + private final ScanFileManager trm; private final IteratorScope scope; private final boolean fullMajorCompaction; private final AccumuloConfiguration config; private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>(); private Map<FileRef,DataFileValue> files; - + public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) { if (scope == IteratorScope.majc) throw new IllegalArgumentException("must set if compaction is full"); - + this.scope = scope; this.trm = null; this.config = config; this.fullMajorCompaction = false; } - + public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files) { if (scope == IteratorScope.majc) throw new IllegalArgumentException("must set if compaction is full"); - + this.scope = scope; this.trm = trm; this.config = config; this.fullMajorCompaction = false; this.files = files; } - + public TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) { if (scope != IteratorScope.majc) throw new IllegalArgumentException("Tried to set maj compaction type when scope was " + scope); - + this.scope = scope; this.trm = null; this.config = config; this.fullMajorCompaction = fullMajC; } - + @Override public AccumuloConfiguration getConfig() { return config; } - + @Override public IteratorScope getIteratorScope() { return scope; } - + @Override public boolean isFullMajorCompaction() { if (scope != IteratorScope.majc) throw new IllegalStateException("Asked about major compaction type when scope is " + scope); return fullMajorCompaction; } - + @Override public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException { FileRef ref = new FileRef(mapFileName, new Path(mapFileName)); return trm.openFiles(Collections.singletonMap(ref, files.get(ref)), false).get(0); } - + @Override public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) { topLevelIterators.add(iter); } - + public SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) { if (topLevelIterators.isEmpty()) return iter; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java index 9cc07dc..21734f9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java @@ -18,8 +18,8 @@ package org.apache.accumulo.tserver; import java.util.List; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.data.Mutation; public class TabletMutations { private final int tid; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d5c1d2f..2bfa5a0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -349,7 +349,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private final SessionManager sessionManager; - private final WriteTracker writeTracker = new WriteTracker(); private final RowLocks rowLocks = new RowLocks(); @@ -2122,16 +2121,17 @@ public class TabletServer extends AccumuloServerContext implements Runnable { locationToOpen = VolumeUtil.switchRootTabletVolume(extent, locationToOpen); tablet = new Tablet(TabletServer.this, extent, locationToOpen, trm, tabletsKeyValues); - /* @formatter:off - * If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor compacted. + /* + * @formatter:off If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor + * compacted. * * There are three reasons to wait for this minor compaction to finish before placing the tablet in online tablets. * - * 1) The log recovery code does not handle data written to the tablet on multiple tablet servers. - * 2) The log recovery code does not block if memory is full. Therefore recovering lots of tablets that use a lot of memory could run out of memory. - * 3) The minor compaction finish event did not make it to the logs (the file will be in metadata, preventing replay of compacted data)... - * but do not want a majc to wipe the file out from metadata and then have another process failure... - * this could cause duplicate data to replay. + * 1) The log recovery code does not handle data written to the tablet on multiple tablet servers. 2) The log recovery code does not block if memory is + * full. Therefore recovering lots of tablets that use a lot of memory could run out of memory. 3) The minor compaction finish event did not make it to + * the logs (the file will be in metadata, preventing replay of compacted data)... but do not want a majc to wipe the file out from metadata and then + * have another process failure... this could cause duplicate data to replay. + * * @formatter:on */ if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index bb9d427..351d526 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -420,7 +420,7 @@ public class TabletServerResourceManager { if (!tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM)) { if (tablet.isClosed()) { // attempt to remove it from the current reports if still there - synchronized(tabletReports) { + synchronized (tabletReports) { TabletStateImpl latestReport = tabletReports.remove(keyExtent); if (latestReport != null) { if (latestReport.getTablet() != tablet) { @@ -644,8 +644,6 @@ public class TabletServerResourceManager { } } - - // END methods that Tablets call to make decisions about major compaction // tablets call this method to run minor compactions, http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java index 40906df..1e2cdf4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java @@ -21,18 +21,18 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.server.util.ActionStatsUpdator; public class TabletStatsKeeper { - + // suspect we need more synchronization in this class private ActionStats major = new ActionStats(); private ActionStats minor = new ActionStats(); private ActionStats split = new ActionStats(); - + public enum Operation { MAJOR, SPLIT, MINOR } - + private ActionStats[] map = new ActionStats[] {major, split, minor}; - + public void updateTime(Operation operation, long queued, long start, long count, boolean failed) { try { ActionStats data = map[operation.ordinal()]; @@ -42,7 +42,7 @@ public class TabletStatsKeeper { } else { double t = (System.currentTimeMillis() - start) / 1000.0; double q = (start - queued) / 1000.0; - + data.status--; data.count += count; data.num++; @@ -56,9 +56,9 @@ public class TabletStatsKeeper { } catch (Exception E) { resetTimes(); } - + } - + public void updateTime(Operation operation, long start, long count, boolean failed) { try { ActionStats data = map[operation.ordinal()]; @@ -67,52 +67,52 @@ public class TabletStatsKeeper { data.status--; } else { double t = (System.currentTimeMillis() - start) / 1000.0; - + data.status--; data.num++; data.elapsed += t; data.sumDev += t * t; - + if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0) resetTimes(); } } catch (Exception E) { resetTimes(); } - + } - + public void saveMajorMinorTimes(TabletStats t) { ActionStatsUpdator.update(minor, t.minors); ActionStatsUpdator.update(major, t.majors); } - + public void saveMinorTimes(TabletStatsKeeper t) { ActionStatsUpdator.update(minor, t.minor); } - + public void saveMajorTimes(TabletStatsKeeper t) { ActionStatsUpdator.update(major, t.major); } - + public void resetTimes() { major = new ActionStats(); split = new ActionStats(); minor = new ActionStats(); } - + public void incrementStatusMinor() { minor.status++; } - + public void incrementStatusMajor() { major.status++; } - + public void incrementStatusSplit() { split.status++; } - + public TabletStats getTabletStats() { return new TabletStats(null, major, minor, split, 0, 0, 0, 0); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java index 98c7a02..026f7e2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java @@ -19,9 +19,9 @@ package org.apache.accumulo.tserver; import java.io.IOException; public class TooManyFilesException extends IOException { - + private static final long serialVersionUID = 1L; - + public TooManyFilesException(String msg) { super(msg); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java index dbb67a9..edaca31 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TservConstraintEnv.java @@ -80,4 +80,4 @@ public class TservConstraintEnv implements Environment { } }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java index 84b5cd0..9bf80b4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/WriteTracker.java @@ -30,13 +30,13 @@ import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.log4j.Logger; /** - * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids - * are monotonically increasing. + * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids are + * monotonically increasing. * */ class WriteTracker { private static final Logger log = Logger.getLogger(WriteTracker.class); - + private static final AtomicLong operationCounter = new AtomicLong(1); private final Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class); @@ -93,4 +93,4 @@ class WriteTracker { return startWrite(TabletType.type(extents)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java index 75c6bd8..8f98761 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java @@ -22,9 +22,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import com.google.common.collect.Sets; import org.apache.accumulo.server.fs.FileRef; +import com.google.common.collect.Sets; + /** * A plan for a compaction: the input files, the files that are *not* inputs to a compaction that should simply be deleted, and the optional parameters used to * create the resulting output file. @@ -59,7 +60,7 @@ public class CompactionPlan { /** * Validate compaction plan. - * + * * @param allFiles * All possible files * @throws IllegalStateException http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java index 2d94884..40cb604 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java @@ -42,10 +42,10 @@ public abstract class CompactionStrategy { * Determine if this tablet is eligible for a major compaction. It's ok if it later determines (through {@link #gatherInformation(MajorCompactionRequest)} and * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called. - * + * * <P> * Called while holding the tablet lock, so it should not be doing any blocking. - * + * * <P> * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be * easily removed. @@ -55,7 +55,7 @@ public abstract class CompactionStrategy { /** * Called prior to obtaining the tablet lock, useful for examining metadata or indexes. State collected during this method will be available during the call * the {@link #getCompactionPlan(MajorCompactionRequest)}. - * + * * @param request * basic details about the tablet */ @@ -63,11 +63,11 @@ public abstract class CompactionStrategy { /** * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking. - * + * * <P> * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be * easily removed. - * + * * @param request * basic details about the tablet * @return the plan for a major compaction, or null to cancel the compaction. http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java index 6d4dc79..6cc9025 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.server.fs.FileRef; /** - * + * */ public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy { public static final String SIZE_LIMIT_OPT = "sizeLimit"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java index 7ea1388..fd9e521 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java @@ -36,10 +36,10 @@ import org.apache.log4j.Logger; import com.google.common.annotations.VisibleForTesting; public class ConstraintChecker { - + private ArrayList<Constraint> constrains; private static final Logger log = Logger.getLogger(ConstraintChecker.class); - + private ClassLoader loader; private TableConfiguration conf; @@ -47,7 +47,7 @@ public class ConstraintChecker { public ConstraintChecker(TableConfiguration conf) { constrains = new ArrayList<Constraint>(); - + this.conf = conf; try { @@ -58,7 +58,7 @@ public class ConstraintChecker { } else { loader = AccumuloVFSClassLoader.getClassLoader(); } - + for (Entry<String,String> entry : conf) { if (entry.getKey().startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) { String className = entry.getValue(); @@ -67,7 +67,7 @@ public class ConstraintChecker { constrains.add(clazz.newInstance()); } } - + lastCheck.set(System.currentTimeMillis()); } catch (Throwable e) { @@ -84,21 +84,21 @@ public class ConstraintChecker { } public boolean classLoaderChanged() { - + if (constrains.size() == 0) return false; try { String context = conf.get(Property.TABLE_CLASSPATH); - + ClassLoader currentLoader; - + if (context != null && !context.equals("")) { currentLoader = AccumuloVFSClassLoader.getContextManager().getClassLoader(context); } else { currentLoader = AccumuloVFSClassLoader.getClassLoader(); } - + return currentLoader != loader; } catch (Exception e) { log.debug("Failed to check " + e.getMessage()); @@ -117,10 +117,10 @@ public class ConstraintChecker { public Violations check(Environment env, Mutation m) { if (!env.getExtent().contains(new ComparableBytes(m.getRow()))) { Violations violations = new Violations(); - + ConstraintViolationSummary cvs = new ConstraintViolationSummary(SystemConstraint.class.getName(), (short) -1, "Mutation outside of tablet extent", 1); violations.add(cvs); - + // do not bother with further checks since this mutation does not go with this tablet return violations; } @@ -133,8 +133,7 @@ public class ConstraintChecker { if (violationCodes != null) { String className = constraint.getClass().getName(); for (Short vcode : violationCodes) { - violations = addViolation(violations, new ConstraintViolationSummary( - className, vcode, constraint.getViolationDescription(vcode), 1)); + violations = addViolation(violations, new ConstraintViolationSummary(className, vcode, constraint.getViolationDescription(vcode), 1)); } } } catch (Throwable throwable) { @@ -161,8 +160,7 @@ public class ConstraintChecker { msg = "threw some Exception"; } - violations = addViolation(violations, new ConstraintViolationSummary( - constraint.getClass().getName(), vcode, "CONSTRAINT FAILED : " + msg, 1)); + violations = addViolation(violations, new ConstraintViolationSummary(constraint.getClass().getName(), vcode, "CONSTRAINT FAILED : " + msg, 1)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java index cf0c176..64bc2cd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/UnsatisfiableConstraint.java @@ -23,21 +23,21 @@ import org.apache.accumulo.core.constraints.Constraint; import org.apache.accumulo.core.data.Mutation; public class UnsatisfiableConstraint implements Constraint { - + private List<Short> violations; private String vDesc; - + public UnsatisfiableConstraint(short vcode, String violationDescription) { this.violations = Collections.unmodifiableList(Collections.singletonList(vcode)); this.vDesc = violationDescription; } - + public List<Short> check(Environment env, Mutation mutation) { return violations; } - + public String getViolationDescription(short violationCode) { return vDesc; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java index 975300b..84137cc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/data/ServerConditionalMutation.java @@ -24,22 +24,22 @@ import org.apache.accumulo.core.data.thrift.TConditionalMutation; import org.apache.accumulo.server.data.ServerMutation; /** - * + * */ public class ServerConditionalMutation extends ServerMutation { - + public static class TCMTranslator extends Translator<TConditionalMutation,ServerConditionalMutation> { @Override public ServerConditionalMutation translate(TConditionalMutation input) { return new ServerConditionalMutation(input); } } - + public static final TCMTranslator TCMT = new TCMTranslator(); private long cmid; private List<TCondition> conditions; - + public ServerConditionalMutation(TConditionalMutation input) { super(input.mutation); @@ -50,10 +50,9 @@ public class ServerConditionalMutation extends ServerMutation { public long getID() { return cmid; } - + public List<TCondition> getConditions() { return conditions; } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 18aa192..6f9be7d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -152,7 +152,7 @@ public class DfsLogger { private static final LogFileValue EMPTY = new LogFileValue(); private boolean closed = false; - + private class LogSyncingTask implements Runnable { @Override @@ -170,8 +170,7 @@ public class DfsLogger { workQueue.drainTo(work); Method durabilityMethod = null; - loop: - for (LogWork logWork : work) { + loop: for (LogWork logWork : work) { switch (logWork.durability) { case DEFAULT: case NONE: @@ -287,7 +286,9 @@ public class DfsLogger { /** * Reference a pre-existing log file. - * @param meta the cq for the "log" entry in +r/!0 + * + * @param meta + * the cq for the "log" entry in +r/!0 */ public DfsLogger(ServerResources conf, String filename, String meta) throws IOException { this.conf = conf; @@ -387,7 +388,8 @@ public class DfsLogger { log.debug("DfsLogger.open() begin"); VolumeManager fs = conf.getFileSystem(); - logPath = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename; + logPath = fs.choose(Optional.<String> absent(), ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger + + Path.SEPARATOR + filename; metaReference = toString(); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java index 10bc903..2658c1f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java @@ -35,8 +35,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.WritableName; import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.WritableName; import org.apache.log4j.Logger; import com.beust.jcommander.JCommander; @@ -50,10 +50,10 @@ import com.google.common.annotations.VisibleForTesting; @SuppressWarnings("deprecation") public class LocalWALRecovery implements Runnable { private static final Logger log = Logger.getLogger(LocalWALRecovery.class); - - static { - WritableName.addName(LogFileKey.class, org.apache.accumulo.server.logger.LogFileKey.class.getName()); - WritableName.addName(LogFileValue.class, org.apache.accumulo.server.logger.LogFileValue.class.getName()); + + static { + WritableName.addName(LogFileKey.class, org.apache.accumulo.server.logger.LogFileKey.class.getName()); + WritableName.addName(LogFileValue.class, org.apache.accumulo.server.logger.LogFileValue.class.getName()); } public static void main(String[] args) throws IOException { @@ -150,7 +150,7 @@ public class LocalWALRecovery implements Runnable { Path localWal = new Path(file.toURI()); FileSystem localFs = FileSystem.getLocal(fs.getConf()); - + Reader reader = new SequenceFile.Reader(localFs, localWal, localFs.getConf()); // Reader reader = new SequenceFile.Reader(localFs.getConf(), SequenceFile.Reader.file(localWal)); Path tmp = new Path(options.destination + "/" + name + ".copy"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 2c6d415..405ec70 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -39,21 +39,25 @@ import org.apache.log4j.Logger; /** * Extract Mutations for a tablet from a set of logs that have been sorted by operation and tablet. - * + * */ public class SortedLogRecovery { private static final Logger log = Logger.getLogger(SortedLogRecovery.class); - + static class EmptyMapFileException extends Exception { private static final long serialVersionUID = 1L; - public EmptyMapFileException() { super(); } + public EmptyMapFileException() { + super(); + } } static class UnusedException extends Exception { private static final long serialVersionUID = 1L; - public UnusedException() { super(); } + public UnusedException() { + super(); + } } private VolumeManager fs; @@ -61,28 +65,28 @@ public class SortedLogRecovery { public SortedLogRecovery(VolumeManager fs) { this.fs = fs; } - + private enum Status { INITIAL, LOOKING_FOR_FINISH, COMPLETE }; - + private static class LastStartToFinish { long lastStart = -1; long seq = -1; long lastFinish = -1; Status compactionStatus = Status.INITIAL; String tserverSession = ""; - + private void update(long newFinish) { this.seq = this.lastStart; if (newFinish != -1) lastFinish = newFinish; } - + private void update(int newStartFile, long newStart) { this.lastStart = newStart; } - + private void update(String newSession) { this.lastStart = -1; this.lastFinish = -1; @@ -90,7 +94,7 @@ public class SortedLogRecovery { this.tserverSession = newSession; } } - + public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { int[] tids = new int[recoveryLogs.size()]; LastStartToFinish lastStartToFinish = new LastStartToFinish(); @@ -115,12 +119,12 @@ public class SortedLogRecovery { log.warn("Ignoring error closing file"); } } - + } - + if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH) throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction"); - + for (int i = 0; i < recoveryLogs.size(); i++) { Path logfile = recoveryLogs.get(i); MultiReader reader = new MultiReader(fs, logfile); @@ -136,7 +140,7 @@ public class SortedLogRecovery { log.info("Recovery complete for " + extent + " using " + logfile); } } - + private String getPathSuffix(String pathString) { Path path = new Path(pathString); if (path.depth() < 2) @@ -144,7 +148,8 @@ public class SortedLogRecovery { return path.getParent().getName() + "/" + path.getName(); } - int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, EmptyMapFileException, UnusedException { + int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, + EmptyMapFileException, UnusedException { HashSet<String> suffixes = new HashSet<String>(); for (String path : tabletFiles) @@ -158,7 +163,7 @@ public class SortedLogRecovery { throw new EmptyMapFileException(); if (key.event != OPEN) throw new RuntimeException("First log entry value is not OPEN"); - + if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) { if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH) throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) is not followed by a successful minor compaction."); @@ -168,9 +173,9 @@ public class SortedLogRecovery { if (extent.isRootTablet()) { alternative = RootTable.OLD_EXTENT; } - + LogFileKey defineKey = null; - + // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to while (reader.next(key, value)) { @@ -188,9 +193,9 @@ public class SortedLogRecovery { if (tid < 0) { throw new UnusedException(); } - + log.debug("Found tid, seq " + tid + " " + defineKey.seq); - + // Scan start/stop events for this tablet key = defineKey; key.event = COMPACTION_START; @@ -205,7 +210,7 @@ public class SortedLogRecovery { if (key.seq <= lastStartToFinish.lastStart) throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); lastStartToFinish.update(fileno, key.seq); - + // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table. log.debug("minor compaction into " + key.filename + " finished, but was still in the METADATA"); if (suffixes.contains(getPathSuffix(key.filename))) @@ -225,11 +230,11 @@ public class SortedLogRecovery { } return tid; } - + private void playbackMutations(MultiReader reader, int tid, LastStartToFinish lastStartToFinish, MutationReceiver mr) throws IOException { LogFileKey key = new LogFileKey(); LogFileValue value = new LogFileValue(); - + // Playback mutations after the last stop to finish log.info("Scanning for mutations starting at sequence number " + lastStartToFinish.seq + " for tid " + tid); key.event = MUTATION; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index ceb76da..5c3fc2d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -90,7 +90,7 @@ public class TabletServerLogger { private final AtomicLong syncCounter; private final AtomicLong flushCounter; - + private final static int HALT_AFTER_ERROR_COUNT = 5; // Die if we get 5 WAL creation errors in 10 seconds private final Cache<Long,Object> walErrors = CacheBuilder.newBuilder().maximumSize(HALT_AFTER_ERROR_COUNT).expireAfterWrite(10, TimeUnit.SECONDS).build(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java index 3a20e8d..829cf2f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java @@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.hadoop.io.WritableComparable; public class LogFileKey implements WritableComparable<LogFileKey> { - + public LogEvents event; public String filename = null; public KeyExtent tablet = null; @@ -37,7 +37,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> { public int tid = -1; public static final int VERSION = 2; public String tserverSession; - + @Override public void readFields(DataInput in) throws IOException { int value = in.readByte(); @@ -79,9 +79,9 @@ public class LogFileKey implements WritableComparable<LogFileKey> { default: throw new RuntimeException("Unknown log event type: " + event); } - + } - + @Override public void write(DataOutput out) throws IOException { out.writeByte(event.ordinal()); @@ -119,7 +119,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> { throw new IllegalArgumentException("Bad value for LogFileEntry type"); } } - + static int eventType(LogEvents event) { // Order logs by START, TABLET_DEFINITIONS, COMPACTIONS and then MUTATIONS if (event == MUTATION || event == MANY_MUTATIONS) { @@ -133,7 +133,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> { } return 2; } - + private static int sign(long l) { if (l < 0) return -1; @@ -141,7 +141,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> { return 1; return 0; } - + @Override public int compareTo(LogFileKey o) { if (eventType(this.event) != eventType(o.event)) { @@ -154,7 +154,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> { } return sign(this.seq - o.seq); } - + @Override public boolean equals(Object obj) { if (obj instanceof LogFileKey) { @@ -162,16 +162,16 @@ public class LogFileKey implements WritableComparable<LogFileKey> { } return false; } - + @Override public int hashCode() { return (int) seq; } - + public static void printEntry(LogFileKey entry) { System.out.println(entry.toString()); } - + @Override public String toString() { switch (event) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java index 81cd593..9ca0f38 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java @@ -31,11 +31,11 @@ import org.apache.accumulo.server.data.ServerMutation; import org.apache.hadoop.io.Writable; public class LogFileValue implements Writable { - + private static final List<Mutation> empty = Collections.emptyList(); - + public List<Mutation> mutations = empty; - + @Override public void readFields(DataInput in) throws IOException { int count = in.readInt(); @@ -46,7 +46,7 @@ public class LogFileValue implements Writable { mutations.add(mutation); } } - + @Override public void write(DataOutput out) throws IOException { out.writeInt(mutations.size()); @@ -54,18 +54,18 @@ public class LogFileValue implements Writable { m.write(out); } } - + public static void print(LogFileValue value) { System.out.println(value.toString()); } - + private static String displayLabels(byte[] labels) { String s = new String(labels, UTF_8); s = s.replace("&", " & "); s = s.replace("|", " | "); return s; } - + public static String format(LogFileValue lfv, int maxMutations) { if (lfv.mutations.size() == 0) return ""; @@ -80,18 +80,17 @@ public class LogFileValue implements Writable { builder.append(" ").append(new String(m.getRow(), UTF_8)).append("\n"); for (ColumnUpdate update : m.getUpdates()) { String value = new String(update.getValue()); - builder.append(" ").append(new String(update.getColumnFamily(), UTF_8)).append(":") - .append(new String(update.getColumnQualifier(), UTF_8)).append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:") - .append(update.getTimestamp()).append(" [").append(displayLabels(update.getColumnVisibility())).append("] ") - .append(update.isDeleted() ? "<deleted>" : value).append("\n"); + builder.append(" ").append(new String(update.getColumnFamily(), UTF_8)).append(":").append(new String(update.getColumnQualifier(), UTF_8)) + .append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:").append(update.getTimestamp()).append(" [") + .append(displayLabels(update.getColumnVisibility())).append("] ").append(update.isDeleted() ? "<deleted>" : value).append("\n"); } } return builder.toString(); } - + @Override public String toString() { return format(this, 5); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java index ecca3ce..ab71794 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/MasterMessage.java @@ -22,7 +22,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.thrift.TException; public interface MasterMessage { - + void send(TCredentials info, String serverName, MasterClientService.Iface client) throws TException, ThriftSecurityException; - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java index 5d61c9c..0c93a86 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/SplitReportMessage.java @@ -20,8 +20,8 @@ import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.client.impl.Translator; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.master.thrift.TabletSplit; @@ -33,24 +33,24 @@ import org.apache.thrift.TException; public class SplitReportMessage implements MasterMessage { Map<KeyExtent,Text> extents; KeyExtent old_extent; - + public SplitReportMessage(KeyExtent old_extent, Map<KeyExtent,Text> newExtents) { this.old_extent = old_extent; extents = new TreeMap<KeyExtent,Text>(newExtents); } - + public SplitReportMessage(KeyExtent old_extent, KeyExtent ne1, Text np1, KeyExtent ne2, Text np2) { this.old_extent = old_extent; extents = new TreeMap<KeyExtent,Text>(); extents.put(ne1, np1); extents.put(ne2, np2); } - + public void send(TCredentials credentials, String serverName, MasterClientService.Iface client) throws TException, ThriftSecurityException { TabletSplit split = new TabletSplit(); split.oldTablet = old_extent.toThrift(); split.newTablets = Translator.translate(extents.keySet(), Translators.KET); client.reportSplitExtent(Tracer.traceInfo(), credentials, serverName, split); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java index 655414d..25db744 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/mastermessage/TabletStatusMessage.java @@ -18,22 +18,22 @@ package org.apache.accumulo.tserver.mastermessage; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.master.thrift.TabletLoadState; import org.apache.accumulo.core.master.thrift.MasterClientService.Iface; +import org.apache.accumulo.core.master.thrift.TabletLoadState; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.trace.Tracer; import org.apache.thrift.TException; public class TabletStatusMessage implements MasterMessage { - + private KeyExtent extent; private TabletLoadState status; - + public TabletStatusMessage(TabletLoadState status, KeyExtent extent) { this.extent = extent; this.status = status; } - + public void send(TCredentials auth, String serverName, Iface client) throws TException, ThriftSecurityException { client.reportTabletStatus(Tracer.traceInfo(), auth, serverName, status, extent.toThrift()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java index 3b7a637..dc35c28 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBean.java @@ -17,34 +17,34 @@ package org.apache.accumulo.tserver.metrics; public interface TabletServerMBean { - + int getOnlineCount(); - + int getOpeningCount(); - + int getUnopenedCount(); - + int getMajorCompactions(); - + int getMajorCompactionsQueued(); - + int getMinorCompactions(); - + int getMinorCompactionsQueued(); - + long getEntries(); - + long getEntriesInMemory(); - + long getQueries(); - + long getIngest(); - + long getTotalMinorCompactions(); - + double getHoldTime(); - + String getName(); - + double getAverageFilesPerTablet(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 7d6c59e..a07f354 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -234,9 +234,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } } - protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, - final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) - throws TTransportException, AccumuloException, AccumuloSecurityException { + protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, + final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, + AccumuloException, AccumuloSecurityException { DataInputStream input; try { input = getRFileInputStream(p); @@ -280,9 +280,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } } - protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, - final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) - throws TTransportException, AccumuloException, AccumuloSecurityException { + protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, + final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, + AccumuloException, AccumuloSecurityException { final Set<Integer> tids; final DataInputStream input; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java index e2af4df..cc79f31 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java @@ -94,8 +94,7 @@ public class ReplicationServicerHandler implements Iface { replayer = clz.newInstance(); } catch (InstantiationException | IllegalAccessException e1) { log.error("Could not instantiate replayer class {}", clz.getName()); - throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class" - + clz.getName()); + throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class" + clz.getName()); } long entriesReplicated; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java index 1d20e2b..bd6bcd3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * */ public class ReplicationWorker implements Runnable { private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);