Merge branch '1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT Conflicts: server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3d3d301f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3d3d301f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3d3d301f Branch: refs/heads/master Commit: 3d3d301f4ab39efd27d1deb1df438a1df0b9c1e8 Parents: 23c74cd 30a0ca3 Author: Keith Turner <ktur...@apache.org> Authored: Mon Aug 25 11:51:38 2014 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Aug 25 11:51:38 2014 -0400 ---------------------------------------------------------------------- .../system/SourceSwitchingIterator.java | 2 +- .../apache/accumulo/tserver/InMemoryMap.java | 61 +++++++++++++------- .../accumulo/tserver/InMemoryMapTest.java | 44 ++++++++++++++ 3 files changed, 85 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d3d301f/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index dc36718,0000000..5f6d9ce mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@@ -1,753 -1,0 +1,772 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SkippingIterator; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator; +import org.apache.accumulo.core.iterators.system.LocalityGroupIterator.LocalityGroup; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; +import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +class MemKeyComparator implements Comparator<Key>, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int compare(Key k1, Key k2) { + int cmp = k1.compareTo(k2); + + if (cmp == 0) { + if (k1 instanceof MemKey) + if (k2 instanceof MemKey) + cmp = ((MemKey) k2).kvCount - ((MemKey) k1).kvCount; + else + cmp = 1; + else if (k2 instanceof MemKey) + cmp = -1; + } + + return cmp; + } +} + +class PartialMutationSkippingIterator extends SkippingIterator implements InterruptibleIterator { + + int kvCount; + + public PartialMutationSkippingIterator(SortedKeyValueIterator<Key,Value> source, int maxKVCount) { + setSource(source); + this.kvCount = maxKVCount; + } + + @Override + protected void consume() throws IOException { + while (getSource().hasTop() && ((MemKey) getSource().getTopKey()).kvCount > kvCount) + getSource().next(); + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + return new PartialMutationSkippingIterator(getSource().deepCopy(env), kvCount); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} + +class MemKeyConversionIterator extends WrappingIterator implements InterruptibleIterator { + MemKey currKey = null; + Value currVal = null; + + public MemKeyConversionIterator(SortedKeyValueIterator<Key,Value> source) { + super(); + setSource(source); + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + return new MemKeyConversionIterator(getSource().deepCopy(env)); + } + + @Override + public Key getTopKey() { + return currKey; + } + + @Override + public Value getTopValue() { + return currVal; + } + + private void getTopKeyVal() { + Key k = super.getTopKey(); + Value v = super.getTopValue(); + if (k instanceof MemKey || k == null) { + currKey = (MemKey) k; + currVal = v; + return; + } + currVal = new Value(v); + int mc = MemValue.splitKVCount(currVal); + currKey = new MemKey(k, mc); + + } + + public void next() throws IOException { + super.next(); + if (hasTop()) + getTopKeyVal(); + } + + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + super.seek(range, columnFamilies, inclusive); + + if (hasTop()) + getTopKeyVal(); + + Key k = range.getStartKey(); + if (k instanceof MemKey && hasTop()) { + while (hasTop() && currKey.compareTo(k) < 0) + next(); + } + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + +} + +public class InMemoryMap { + private SimpleMap map = null; + + private static final Logger log = Logger.getLogger(InMemoryMap.class); + + private volatile String memDumpFile = null; + private final String memDumpDir; + + private Map<String,Set<ByteSequence>> lggroups; + + public InMemoryMap(boolean useNativeMap, String memDumpDir) { + this(new HashMap<String,Set<ByteSequence>>(), useNativeMap, memDumpDir); + } + + public InMemoryMap(Map<String,Set<ByteSequence>> lggroups, boolean useNativeMap, String memDumpDir) { + this.memDumpDir = memDumpDir; + this.lggroups = lggroups; + + if (lggroups.size() == 0) + map = newMap(useNativeMap); + else + map = new LocalityGroupMap(lggroups, useNativeMap); + } + + public InMemoryMap(AccumuloConfiguration config) throws LocalityGroupConfigurationError { + this(LocalityGroupUtil.getLocalityGroups(config), config.getBoolean(Property.TSERV_NATIVEMAP_ENABLED), config.get(Property.TSERV_MEMDUMP_DIR)); + } + + private static SimpleMap newMap(boolean useNativeMap) { + if (useNativeMap && NativeMap.isLoaded()) { + try { + return new NativeMapWrapper(); + } catch (Throwable t) { + log.error("Failed to create native map", t); + } + } + + return new DefaultMap(); + } + + private interface SimpleMap { + Value get(Key key); + + Iterator<Entry<Key,Value>> iterator(Key startKey); + + int size(); + + InterruptibleIterator skvIterator(); + + void delete(); + + long getMemoryUsed(); + + void mutate(List<Mutation> mutations, int kvCount); + } + + private static class LocalityGroupMap implements SimpleMap { + + private Map<ByteSequence,MutableLong> groupFams[]; + + // the last map in the array is the default locality group + private SimpleMap maps[]; + private Partitioner partitioner; + private List<Mutation>[] partitioned; + private Set<ByteSequence> nonDefaultColumnFamilies; + + @SuppressWarnings("unchecked") + LocalityGroupMap(Map<String,Set<ByteSequence>> groups, boolean useNativeMap) { + this.groupFams = new Map[groups.size()]; + this.maps = new SimpleMap[groups.size() + 1]; + this.partitioned = new List[groups.size() + 1]; + this.nonDefaultColumnFamilies = new HashSet<ByteSequence>(); + + for (int i = 0; i < maps.length; i++) { + maps[i] = newMap(useNativeMap); + } + + int count = 0; + for (Set<ByteSequence> cfset : groups.values()) { + HashMap<ByteSequence,MutableLong> map = new HashMap<ByteSequence,MutableLong>(); + for (ByteSequence bs : cfset) + map.put(bs, new MutableLong(1)); + this.groupFams[count++] = map; + nonDefaultColumnFamilies.addAll(cfset); + } + + partitioner = new LocalityGroupUtil.Partitioner(this.groupFams); + + for (int i = 0; i < partitioned.length; i++) { + partitioned[i] = new ArrayList<Mutation>(); + } + } + + @Override + public Value get(Key key) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<Entry<Key,Value>> iterator(Key startKey) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + int sum = 0; + for (SimpleMap map : maps) + sum += map.size(); + return sum; + } + + @Override + public InterruptibleIterator skvIterator() { + LocalityGroup groups[] = new LocalityGroup[maps.length]; + for (int i = 0; i < groups.length; i++) { + if (i < groupFams.length) + groups[i] = new LocalityGroup(maps[i].skvIterator(), groupFams[i], false); + else + groups[i] = new LocalityGroup(maps[i].skvIterator(), null, true); + } + + + return new LocalityGroupIterator(groups, nonDefaultColumnFamilies); + } + + @Override + public void delete() { + for (SimpleMap map : maps) + map.delete(); + } + + @Override + public long getMemoryUsed() { + long sum = 0; + for (SimpleMap map : maps) + sum += map.getMemoryUsed(); + return sum; + } + + @Override + public synchronized void mutate(List<Mutation> mutations, int kvCount) { + // this method is synchronized because it reuses objects to avoid allocation, + // currently, the method that calls this is synchronized so there is no + // loss in parallelism.... synchronization was added here for future proofing + + try{ + partitioner.partition(mutations, partitioned); + + for (int i = 0; i < partitioned.length; i++) { + if (partitioned[i].size() > 0) { + maps[i].mutate(partitioned[i], kvCount); + for (Mutation m : partitioned[i]) + kvCount += m.getUpdates().size(); + } + } + } finally { + // clear immediately so mutations can be garbage collected + for (List<Mutation> list : partitioned) { + list.clear(); + } + } + } + + } + + private static class DefaultMap implements SimpleMap { + private ConcurrentSkipListMap<Key,Value> map = new ConcurrentSkipListMap<Key,Value>(new MemKeyComparator()); + private AtomicLong bytesInMemory = new AtomicLong(); + private AtomicInteger size = new AtomicInteger(); + + public void put(Key key, Value value) { + // Always a MemKey, so account for the kvCount int + bytesInMemory.addAndGet(key.getLength() + 4); + bytesInMemory.addAndGet(value.getSize()); + if (map.put(key, value) == null) + size.incrementAndGet(); + } + + public Value get(Key key) { + return map.get(key); + } + + public Iterator<Entry<Key,Value>> iterator(Key startKey) { + Key lk = new Key(startKey); + SortedMap<Key,Value> tm = map.tailMap(lk); + return tm.entrySet().iterator(); + } + + public int size() { + return size.get(); + } + + public synchronized InterruptibleIterator skvIterator() { + if (map == null) + throw new IllegalStateException(); + + return new SortedMapIterator(map); + } + + public synchronized void delete() { + map = null; + } + + public long getOverheadPerEntry() { + // all of the java objects that are used to hold the + // data and make it searchable have overhead... this + // overhead is estimated using test.EstimateInMemMapOverhead + // and is in bytes.. the estimates were obtained by running + // java 6_16 in 64 bit server mode + + return 200; + } + + @Override + public void mutate(List<Mutation> mutations, int kvCount) { + for (Mutation m : mutations) { + for (ColumnUpdate cvp : m.getUpdates()) { + Key newKey = new MemKey(m.getRow(), cvp.getColumnFamily(), cvp.getColumnQualifier(), cvp.getColumnVisibility(), cvp.getTimestamp(), cvp.isDeleted(), + false, kvCount++); + Value value = new Value(cvp.getValue()); + put(newKey, value); + } + } + } + + @Override + public long getMemoryUsed() { + return bytesInMemory.get() + (size() * getOverheadPerEntry()); + } + } + + private static class NativeMapWrapper implements SimpleMap { + private NativeMap nativeMap; + + NativeMapWrapper() { + nativeMap = new NativeMap(); + } + + public Value get(Key key) { + return nativeMap.get(key); + } + + public Iterator<Entry<Key,Value>> iterator(Key startKey) { + return nativeMap.iterator(startKey); + } + + public int size() { + return nativeMap.size(); + } + + public InterruptibleIterator skvIterator() { + return (InterruptibleIterator) nativeMap.skvIterator(); + } + + public void delete() { + nativeMap.delete(); + } + + public long getMemoryUsed() { + return nativeMap.getMemoryUsed(); + } + + @Override + public void mutate(List<Mutation> mutations, int kvCount) { + nativeMap.mutate(mutations, kvCount); + } + } + + private AtomicInteger nextKVCount = new AtomicInteger(1); + private AtomicInteger kvCount = new AtomicInteger(0); + + private Object writeSerializer = new Object(); + + /** + * Applies changes to a row in the InMemoryMap + * + */ + public void mutate(List<Mutation> mutations) { + int numKVs = 0; + for (int i = 0; i < mutations.size(); i++) + numKVs += mutations.get(i).size(); + + // Can not update mutationCount while writes that started before + // are in progress, this would cause partial mutations to be seen. + // Also, can not continue until mutation count is updated, because + // a read may not see a successful write. Therefore writes must + // wait for writes that started before to finish. + // + // using separate lock from this map, to allow read/write in parallel + synchronized (writeSerializer ) { + int kv = nextKVCount.getAndAdd(numKVs); + try { + map.mutate(mutations, kv); + } finally { + kvCount.set(kv + numKVs - 1); + } + } + } + + /** + * Returns a long representing the size of the InMemoryMap + * + * @return bytesInMemory + */ + public synchronized long estimatedSizeInBytes() { + if (map == null) + return 0; + + return map.getMemoryUsed(); + } + + Iterator<Map.Entry<Key,Value>> iterator(Key startKey) { + return map.iterator(startKey); + } + + public long getNumEntries() { + return map.size(); + } + + private final Set<MemoryIterator> activeIters = Collections.synchronizedSet(new HashSet<MemoryIterator>()); + + class MemoryDataSource implements DataSource { + + boolean switched = false; + private InterruptibleIterator iter; - private List<FileSKVIterator> readers; ++ private FileSKVIterator reader; ++ private MemoryDataSource parent; ++ private IteratorEnvironment env; + + MemoryDataSource() { - this(new ArrayList<FileSKVIterator>()); ++ this(null, false, null); + } + - public MemoryDataSource(List<FileSKVIterator> readers) { - this.readers = readers; ++ public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) { ++ this.parent = parent; ++ this.switched = switched; ++ this.env = env; + } + + @Override + public boolean isCurrent() { + if (switched) + return true; + else + return memDumpFile == null; + } + + @Override + public DataSource getNewDataSource() { + if (switched) + throw new IllegalStateException(); + + if (!isCurrent()) { + switched = true; + iter = null; ++ try { ++ // ensure files are referenced even if iterator was never seeked before ++ iterator(); ++ } catch (IOException e) { ++ throw new RuntimeException(); ++ } + } + + return this; + } + ++ private synchronized FileSKVIterator getReader() throws IOException { ++ if (reader == null) { ++ Configuration conf = CachedConfiguration.getInstance(); ++ FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); ++ ++ reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration()); ++ } ++ ++ return reader; ++ } ++ + @Override + public SortedKeyValueIterator<Key,Value> iterator() throws IOException { + if (iter == null) + if (!switched) + iter = map.skvIterator(); + else { - - Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); - - FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration()); - - readers.add(reader); - - iter = new MemKeyConversionIterator(reader); ++ if (parent == null) ++ iter = new MemKeyConversionIterator(getReader()); ++ else ++ synchronized (parent) { ++ // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the ++ // thread deleting an InMemoryMap and scan thread could be switching different deep copies ++ iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env)); ++ } + } + + return iter; + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { - return new MemoryDataSource(readers); ++ return new MemoryDataSource(parent == null ? this : parent, switched, env); + } + + } + + class MemoryIterator extends WrappingIterator implements InterruptibleIterator { + + private AtomicBoolean closed; + private SourceSwitchingIterator ssi; + private MemoryDataSource mds; + + protected SortedKeyValueIterator<Key,Value> getSource() { + if (closed.get()) + throw new IllegalStateException("Memory iterator is closed"); + return super.getSource(); + } + + private MemoryIterator(InterruptibleIterator source) { + this(source, new AtomicBoolean(false)); + } + + private MemoryIterator(SortedKeyValueIterator<Key,Value> source, AtomicBoolean closed) { + setSource(source); + this.closed = closed; + } + + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + return new MemoryIterator(getSource().deepCopy(env), closed); + } + + public void close() { + + synchronized (this) { + if (closed.compareAndSet(false, true)) { - - for (FileSKVIterator reader : mds.readers) - try { - reader.close(); - } catch (IOException e) { - log.warn(e, e); - } ++ try { ++ if (mds.reader != null) ++ mds.reader.close(); ++ } catch (IOException e) { ++ log.warn(e, e); ++ } + } + } + + // remove outside of sync to avoid deadlock + activeIters.remove(this); + } + + private synchronized boolean switchNow() throws IOException { + if (closed.get()) + return false; + + ssi.switchNow(); + return true; + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + } + + private void setSSI(SourceSwitchingIterator ssi) { + this.ssi = ssi; + } + + public void setMDS(MemoryDataSource mds) { + this.mds = mds; + } + + } + + public synchronized MemoryIterator skvIterator() { + if (map == null) + throw new NullPointerException(); + + if (deleted) + throw new IllegalStateException("Can not obtain iterator after map deleted"); + + int mc = kvCount.get(); + MemoryDataSource mds = new MemoryDataSource(); + SourceSwitchingIterator ssi = new SourceSwitchingIterator(new MemoryDataSource()); + MemoryIterator mi = new MemoryIterator(new PartialMutationSkippingIterator(ssi, mc)); + mi.setSSI(ssi); + mi.setMDS(mds); + activeIters.add(mi); + return mi; + } + + public SortedKeyValueIterator<Key,Value> compactionIterator() { + + if (nextKVCount.get() - 1 != kvCount.get()) + throw new IllegalStateException("Memory map in unexpected state : nextKVCount = " + nextKVCount.get() + " kvCount = " + + kvCount.get()); + + return map.skvIterator(); + } + + private boolean deleted = false; + + public void delete(long waitTime) { + + synchronized (this) { + if (deleted) + throw new IllegalStateException("Double delete"); + + deleted = true; + } + + long t1 = System.currentTimeMillis(); + + while (activeIters.size() > 0 && System.currentTimeMillis() - t1 < waitTime) { + UtilWaitThread.sleep(50); + } + + if (activeIters.size() > 0) { + // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file + try { + Configuration conf = CachedConfiguration.getInstance(); + FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); + + String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION; + + Configuration newConf = new Configuration(conf); + newConf.setInt("io.seqfile.compress.blocksize", 100000); + + FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, ServerConfiguration.getSiteConfiguration()); + + InterruptibleIterator iter = map.skvIterator(); + + HashSet<ByteSequence> allfams= new HashSet<ByteSequence>(); + + for(Entry<String, Set<ByteSequence>> entry : lggroups.entrySet()){ + allfams.addAll(entry.getValue()); + out.startNewLocalityGroup(entry.getKey(), entry.getValue()); + iter.seek(new Range(), entry.getValue(), true); + dumpLocalityGroup(out, iter); + } + + out.startDefaultLocalityGroup(); + iter.seek(new Range(), allfams, false); + + dumpLocalityGroup(out, iter); + + out.close(); + + log.debug("Created mem dump file " + tmpFile); + + memDumpFile = tmpFile; + + synchronized (activeIters) { + for (MemoryIterator mi : activeIters) { + mi.switchNow(); + } + } + + // rely on unix behavior that file will be deleted when last + // reader closes it + fs.delete(new Path(memDumpFile), true); + + } catch (IOException ioe) { + log.error("Failed to create mem dump file ", ioe); + + while (activeIters.size() > 0) { + UtilWaitThread.sleep(100); + } + } + + } + + SimpleMap tmpMap = map; + + synchronized (this) { + map = null; + } + + tmpMap.delete(); + } + + private void dumpLocalityGroup(FileSKVWriter out, InterruptibleIterator iter) throws IOException { + while (iter.hasTop() && activeIters.size() > 0) { + // RFile does not support MemKey, so we move the kv count into the value only for the RFile. + // There is no need to change the MemKey to a normal key because the kvCount info gets lost when it is written + Value newValue = new MemValue(iter.getTopValue(), ((MemKey) iter.getTopKey()).kvCount); + out.append(iter.getTopKey(), newValue); + iter.next(); + + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d3d301f/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java ---------------------------------------------------------------------- diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java index dc7ee99,0000000..3932552 mode 100644,000000..100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/InMemoryMapTest.java @@@ -1,513 -1,0 +1,557 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ZooConfiguration; +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class InMemoryMapTest { + + @BeforeClass + public static void setUp() throws Exception { + // suppress log messages having to do with not having an instance + Logger.getLogger(ZooConfiguration.class).setLevel(Level.OFF); + Logger.getLogger(HdfsZooInstance.class).setLevel(Level.OFF); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + public void mutate(InMemoryMap imm, String row, String column, long ts) { + Mutation m = new Mutation(new Text(row)); + String[] sa = column.split(":"); + m.putDelete(new Text(sa[0]), new Text(sa[1]), ts); + + imm.mutate(Collections.singletonList(m)); + } + + public void mutate(InMemoryMap imm, String row, String column, long ts, String value) { + Mutation m = new Mutation(new Text(row)); + String[] sa = column.split(":"); + m.put(new Text(sa[0]), new Text(sa[1]), ts, new Value(value.getBytes())); + + imm.mutate(Collections.singletonList(m)); + } + + static Key nk(String row, String column, long ts) { + String[] sa = column.split(":"); + Key k = new Key(new Text(row), new Text(sa[0]), new Text(sa[1]), ts); + return k; + } + + static void ae(SortedKeyValueIterator<Key,Value> dc, String row, String column, int ts, String val) throws IOException { + assertTrue(dc.hasTop()); + assertEquals(nk(row, column, ts), dc.getTopKey()); + assertEquals(new Value(val.getBytes()), dc.getTopValue()); + dc.next(); + + } + + static Set<ByteSequence> newCFSet(String... cfs) { + HashSet<ByteSequence> cfSet = new HashSet<ByteSequence>(); + for (String cf : cfs) { + cfSet.add(new ArrayByteSequence(cf)); + } + return cfSet; + } + + @Test + public void test2() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + MemoryIterator ski1 = imm.skvIterator(); + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + MemoryIterator ski2 = imm.skvIterator(); + + ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + assertFalse(ski1.hasTop()); + + ski2.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + assertTrue(ski2.hasTop()); + ae(ski2, "r1", "foo:cq1", 3, "bar1"); + assertFalse(ski2.hasTop()); + + } + + @Test + public void test3() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + mutate(imm, "r1", "foo:cq1", 3, "bar2"); + MemoryIterator ski1 = imm.skvIterator(); + mutate(imm, "r1", "foo:cq1", 3, "bar3"); + + mutate(imm, "r3", "foo:cq1", 3, "bar9"); + mutate(imm, "r3", "foo:cq1", 3, "bara"); + + MemoryIterator ski2 = imm.skvIterator(); + + ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar2"); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + assertFalse(ski1.hasTop()); + + ski2.seek(new Range(new Text("r3")), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski2, "r3", "foo:cq1", 3, "bara"); + ae(ski2, "r3", "foo:cq1", 3, "bar9"); + assertFalse(ski1.hasTop()); + + } + + @Test + public void test4() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + mutate(imm, "r1", "foo:cq1", 3, "bar2"); + MemoryIterator ski1 = imm.skvIterator(); + mutate(imm, "r1", "foo:cq1", 3, "bar3"); + + imm.delete(0); + + ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar2"); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + assertFalse(ski1.hasTop()); + + ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar2"); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + assertFalse(ski1.hasTop()); + + ski1.seek(new Range(new Text("r2")), LocalityGroupUtil.EMPTY_CF_SET, false); + assertFalse(ski1.hasTop()); + + ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar2"); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + assertFalse(ski1.hasTop()); + + ski1.close(); + } + + @Test + public void test5() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + mutate(imm, "r1", "foo:cq1", 3, "bar2"); + mutate(imm, "r1", "foo:cq1", 3, "bar3"); + + MemoryIterator ski1 = imm.skvIterator(); + ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar3"); + + imm.delete(0); + + ae(ski1, "r1", "foo:cq1", 3, "bar2"); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + assertFalse(ski1.hasTop()); + + ski1.close(); + + imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + mutate(imm, "r1", "foo:cq2", 3, "bar2"); + mutate(imm, "r1", "foo:cq3", 3, "bar3"); + + ski1 = imm.skvIterator(); + ski1.seek(new Range(new Text("r1")), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + + imm.delete(0); + + ae(ski1, "r1", "foo:cq2", 3, "bar2"); + ae(ski1, "r1", "foo:cq3", 3, "bar3"); + assertFalse(ski1.hasTop()); + + ski1.close(); + } + + @Test + public void test6() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + mutate(imm, "r1", "foo:cq2", 3, "bar2"); + mutate(imm, "r1", "foo:cq3", 3, "bar3"); + mutate(imm, "r1", "foo:cq4", 3, "bar4"); + + MemoryIterator ski1 = imm.skvIterator(); + + mutate(imm, "r1", "foo:cq5", 3, "bar5"); + + SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null); + + ski1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(ski1, "r1", "foo:cq1", 3, "bar1"); + + dc.seek(new Range(nk("r1", "foo:cq2", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(dc, "r1", "foo:cq2", 3, "bar2"); + + imm.delete(0); + + ae(ski1, "r1", "foo:cq2", 3, "bar2"); + ae(dc, "r1", "foo:cq3", 3, "bar3"); + ae(ski1, "r1", "foo:cq3", 3, "bar3"); + ae(dc, "r1", "foo:cq4", 3, "bar4"); + ae(ski1, "r1", "foo:cq4", 3, "bar4"); + assertFalse(ski1.hasTop()); + assertFalse(dc.hasTop()); + + ski1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + + dc.seek(new Range(nk("r1", "foo:cq4", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(dc, "r1", "foo:cq4", 3, "bar4"); + assertFalse(dc.hasTop()); + + ae(ski1, "r1", "foo:cq3", 3, "bar3"); + ae(ski1, "r1", "foo:cq4", 3, "bar4"); + assertFalse(ski1.hasTop()); + assertFalse(dc.hasTop()); + + ski1.close(); + } + ++ private void deepCopyAndDelete(int interleaving) throws Exception { ++ // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map ++ ++ InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); ++ ++ mutate(imm, "r1", "foo:cq1", 3, "bar1"); ++ mutate(imm, "r1", "foo:cq2", 3, "bar2"); ++ ++ MemoryIterator ski1 = imm.skvIterator(); ++ ++ if (interleaving == 1) ++ imm.delete(0); ++ ++ SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null); ++ ++ if (interleaving == 2) ++ imm.delete(0); ++ ++ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); ++ ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); ++ ++ if (interleaving == 3) ++ imm.delete(0); ++ ++ ae(dc, "r1", "foo:cq1", 3, "bar1"); ++ ae(ski1, "r1", "foo:cq1", 3, "bar1"); ++ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); ++ ++ if (interleaving == 4) ++ imm.delete(0); ++ ++ ae(ski1, "r1", "foo:cq2", 3, "bar2"); ++ ae(dc, "r1", "foo:cq1", 3, "bar1"); ++ ae(dc, "r1", "foo:cq2", 3, "bar2"); ++ assertFalse(dc.hasTop()); ++ assertFalse(ski1.hasTop()); ++ } ++ ++ @Test ++ public void testDeepCopyAndDelete() throws Exception { ++ for (int i = 0; i <= 4; i++) ++ deepCopyAndDelete(i); ++ } ++ + @Test + public void testBug1() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + for (int i = 0; i < 20; i++) { + mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i); + } + + for (int i = 0; i < 20; i++) { + mutate(imm, "r2", "foo:cq" + i, 3, "bar" + i); + } + + MemoryIterator ski1 = imm.skvIterator(); + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(ski1); + + imm.delete(0); + + ArrayList<ByteSequence> columns = new ArrayList<ByteSequence>(); + columns.add(new ArrayByteSequence("bar")); + + // this seek resulted in an infinite loop before a bug was fixed + cfsi.seek(new Range("r1"), columns, true); + + assertFalse(cfsi.hasTop()); + + ski1.close(); + } + + @Test + public void testSeekBackWards() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + mutate(imm, "r1", "foo:cq1", 3, "bar1"); + mutate(imm, "r1", "foo:cq2", 3, "bar2"); + mutate(imm, "r1", "foo:cq3", 3, "bar3"); + mutate(imm, "r1", "foo:cq4", 3, "bar4"); + + MemoryIterator skvi1 = imm.skvIterator(); + + skvi1.seek(new Range(nk("r1", "foo:cq3", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(skvi1, "r1", "foo:cq3", 3, "bar3"); + + skvi1.seek(new Range(nk("r1", "foo:cq1", 3), null), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(skvi1, "r1", "foo:cq1", 3, "bar1"); + + } + + @Test + public void testDuplicateKey() throws Exception { + InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + + Mutation m = new Mutation(new Text("r1")); + m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes())); + m.put(new Text("foo"), new Text("cq"), 3, new Value("v2".getBytes())); + imm.mutate(Collections.singletonList(m)); + + MemoryIterator skvi1 = imm.skvIterator(); + skvi1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + ae(skvi1, "r1", "foo:cq", 3, "v2"); + ae(skvi1, "r1", "foo:cq", 3, "v1"); + } + + private static final Logger log = Logger.getLogger(InMemoryMapTest.class); + + static long sum(long[] counts) { + long result = 0; + for (int i = 0; i < counts.length; i++) + result += counts[i]; + return result; + } + + // - hard to get this timing test to run well on apache build machines + @Test + @Ignore + public void parallelWriteSpeed() throws InterruptedException, IOException { + List<Double> timings = new ArrayList<Double>(); + for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) { + final long now = System.currentTimeMillis(); + final long counts[] = new long[threads]; + final InMemoryMap imm = new InMemoryMap(false, tempFolder.newFolder().getAbsolutePath()); + ExecutorService e = Executors.newFixedThreadPool(threads); + for (int j = 0; j < threads; j++) { + final int threadId = j; + e.execute(new Runnable() { + @Override + public void run() { + while (System.currentTimeMillis() - now < 1000) { + for (int k = 0; k < 1000; k++) { + Mutation m = new Mutation("row"); + m.put("cf", "cq", new Value("v".getBytes())); + List<Mutation> mutations = Collections.singletonList(m); + imm.mutate(mutations); + counts[threadId]++; + } + } + } + }); + } + e.shutdown(); + e.awaitTermination(10, TimeUnit.SECONDS); + imm.delete(10000); + double mutationsPerSecond = sum(counts) / ((System.currentTimeMillis() - now) / 1000.); + timings.add(mutationsPerSecond); + log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond, threads)); + } + // verify that more threads doesn't go a lot faster, or a lot slower than one thread + for (int i = 0; i < timings.size(); i++) { + double ratioFirst = timings.get(0) / timings.get(i); + assertTrue(ratioFirst < 3); + assertTrue(ratioFirst > 0.3); + } + } + + @Test + public void testLocalityGroups() throws Exception { + + Map<String,Set<ByteSequence>> lggroups1 = new HashMap<String,Set<ByteSequence>>(); + lggroups1.put("lg1", newCFSet("cf1", "cf2")); + lggroups1.put("lg2", newCFSet("cf3", "cf4")); + + InMemoryMap imm = new InMemoryMap(lggroups1, false, tempFolder.newFolder().getAbsolutePath()); + + Mutation m1 = new Mutation("r1"); + m1.put("cf1", "x", 2, "1"); + m1.put("cf1", "y", 2, "2"); + m1.put("cf3", "z", 2, "3"); + m1.put("foo", "b", 2, "9"); + + Mutation m2 = new Mutation("r2"); + m2.put("cf2", "x", 3, "5"); + + Mutation m3 = new Mutation("r3"); + m3.put("foo", "b", 4, "6"); + + Mutation m4 = new Mutation("r4"); + m4.put("foo", "b", 5, "7"); + m4.put("cf4", "z", 5, "8"); + + Mutation m5 = new Mutation("r5"); + m5.put("cf3", "z", 6, "A"); + m5.put("cf4", "z", 6, "B"); + + imm.mutate(Arrays.asList(m1, m2, m3, m4, m5)); + + MemoryIterator iter1 = imm.skvIterator(); + + seekLocalityGroups(iter1); + SortedKeyValueIterator<Key,Value> dc1 = iter1.deepCopy(null); + seekLocalityGroups(dc1); + + assertTrue(imm.getNumEntries() == 10); + assertTrue(imm.estimatedSizeInBytes() > 0); + + imm.delete(0); + + seekLocalityGroups(iter1); + seekLocalityGroups(dc1); + // TODO uncomment following when ACCUMULO-1628 is fixed + // seekLocalityGroups(iter1.deepCopy(null)); + } + + private void seekLocalityGroups(SortedKeyValueIterator<Key,Value> iter1) throws IOException { + iter1.seek(new Range(), newCFSet("cf1"), true); + ae(iter1, "r1", "cf1:x", 2, "1"); + ae(iter1, "r1", "cf1:y", 2, "2"); + ae(iter1, "r2", "cf2:x", 3, "5"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range("r2", "r4"), newCFSet("cf1"), true); + ae(iter1, "r2", "cf2:x", 3, "5"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range(), newCFSet("cf3"), true); + ae(iter1, "r1", "cf3:z", 2, "3"); + ae(iter1, "r4", "cf4:z", 5, "8"); + ae(iter1, "r5", "cf3:z", 6, "A"); + ae(iter1, "r5", "cf4:z", 6, "B"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range(), newCFSet("foo"), true); + ae(iter1, "r1", "foo:b", 2, "9"); + ae(iter1, "r3", "foo:b", 4, "6"); + ae(iter1, "r4", "foo:b", 5, "7"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range(), newCFSet("cf1", "cf3"), true); + ae(iter1, "r1", "cf1:x", 2, "1"); + ae(iter1, "r1", "cf1:y", 2, "2"); + ae(iter1, "r1", "cf3:z", 2, "3"); + ae(iter1, "r2", "cf2:x", 3, "5"); + ae(iter1, "r4", "cf4:z", 5, "8"); + ae(iter1, "r5", "cf3:z", 6, "A"); + ae(iter1, "r5", "cf4:z", 6, "B"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range("r2", "r4"), newCFSet("cf1", "cf3"), true); + ae(iter1, "r2", "cf2:x", 3, "5"); + ae(iter1, "r4", "cf4:z", 5, "8"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range(), newCFSet("cf1", "cf3", "foo"), true); + assertAll(iter1); + + iter1.seek(new Range("r1", "r2"), newCFSet("cf1", "cf3", "foo"), true); + ae(iter1, "r1", "cf1:x", 2, "1"); + ae(iter1, "r1", "cf1:y", 2, "2"); + ae(iter1, "r1", "cf3:z", 2, "3"); + ae(iter1, "r1", "foo:b", 2, "9"); + ae(iter1, "r2", "cf2:x", 3, "5"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false); + assertAll(iter1); + + iter1.seek(new Range(), newCFSet("cf1"), false); + assertAll(iter1); + + iter1.seek(new Range(), newCFSet("cf1", "cf2"), false); + ae(iter1, "r1", "cf3:z", 2, "3"); + ae(iter1, "r1", "foo:b", 2, "9"); + ae(iter1, "r3", "foo:b", 4, "6"); + ae(iter1, "r4", "cf4:z", 5, "8"); + ae(iter1, "r4", "foo:b", 5, "7"); + ae(iter1, "r5", "cf3:z", 6, "A"); + ae(iter1, "r5", "cf4:z", 6, "B"); + assertFalse(iter1.hasTop()); + + iter1.seek(new Range("r2"), newCFSet("cf1", "cf3", "foo"), true); + ae(iter1, "r2", "cf2:x", 3, "5"); + assertFalse(iter1.hasTop()); + } + + private void assertAll(SortedKeyValueIterator<Key,Value> iter1) throws IOException { + ae(iter1, "r1", "cf1:x", 2, "1"); + ae(iter1, "r1", "cf1:y", 2, "2"); + ae(iter1, "r1", "cf3:z", 2, "3"); + ae(iter1, "r1", "foo:b", 2, "9"); + ae(iter1, "r2", "cf2:x", 3, "5"); + ae(iter1, "r3", "foo:b", 4, "6"); + ae(iter1, "r4", "cf4:z", 5, "8"); + ae(iter1, "r4", "foo:b", 5, "7"); + ae(iter1, "r5", "cf3:z", 6, "A"); + ae(iter1, "r5", "cf4:z", 6, "B"); + assertFalse(iter1.hasTop()); + } +}