Repository: accumulo Updated Branches: refs/heads/1.6 cb262120e -> c85bf4f7d
ACCUMULO-3297 allow metadata table reads even if all files have been allocated Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c85bf4f7 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c85bf4f7 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c85bf4f7 Branch: refs/heads/1.6 Commit: c85bf4f7d99c9edac5d64db26ca1f7a89d2b80e8 Parents: cb26212 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Thu Nov 6 13:48:30 2014 -0500 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Thu Nov 6 13:48:47 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/tserver/FileManager.java | 35 +++--- .../apache/accumulo/test/MetaGetsReadersIT.java | 114 +++++++++++++++++++ 2 files changed, 135 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c85bf4f7/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java index b82b9cc..1735aec 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java @@ -53,7 +53,6 @@ import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; public class FileManager { @@ -204,7 +203,7 @@ public class FileManager { ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>(); - for (int i = 0; i < numToTake; i++) { + for (int i = 0; i < numToTake && i < openReaders.size(); i++) { OpenReader or = openReaders.get(i); List<OpenReader> ofl = openFiles.get(or.fileName); @@ -266,9 +265,9 @@ public class FileManager { return reservedReaders.get(reader); } - private List<FileSKVIterator> reserveReaders(Text table, Collection<String> files, boolean continueOnFailure) throws IOException { + private List<FileSKVIterator> reserveReaders(KeyExtent tablet, Collection<String> files, boolean continueOnFailure) throws IOException { - if (files.size() >= maxOpen) { + if (!tablet.isMeta() && files.size() >= maxOpen) { throw new IllegalArgumentException("requested files exceeds max open"); } @@ -281,7 +280,9 @@ public class FileManager { List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>(); Map<FileSKVIterator,String> readersReserved = new HashMap<FileSKVIterator,String>(); - filePermits.acquireUninterruptibly(files.size()); + if (!tablet.isMeta()) { + filePermits.acquireUninterruptibly(files.size()); + } // now that the we are past the semaphore, we have the authority // to open files.size() files @@ -312,23 +313,27 @@ public class FileManager { Path path = new Path(file); FileSystem ns = fs.getVolumeByPath(path).getFileSystem(); //log.debug("Opening "+file + " path " + path); - FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()), + FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(tablet), dataCache, indexCache); reservedFiles.add(reader); readersReserved.put(reader, file); } catch (Exception e) { - ProblemReports.getInstance().report(new ProblemReport(table.toString(), ProblemType.FILE_READ, file, e)); + ProblemReports.getInstance().report(new ProblemReport(tablet.toString(), ProblemType.FILE_READ, file, e)); if (continueOnFailure) { // release the permit for the file that failed to open - filePermits.release(1); + if (!tablet.isMeta()) { + filePermits.release(1); + } log.warn("Failed to open file " + file + " " + e.getMessage() + " continuing..."); } else { // close whatever files were opened closeReaders(reservedFiles); - filePermits.release(files.size()); + if (!tablet.isMeta()) { + filePermits.release(files.size()); + } log.error("Failed to open file " + file + " " + e.getMessage()); throw new IOException("Failed to open " + file, e); @@ -344,7 +349,7 @@ public class FileManager { return reservedFiles; } - private void releaseReaders(List<FileSKVIterator> readers, boolean sawIOException) { + private void releaseReaders(KeyExtent tablet, List<FileSKVIterator> readers, boolean sawIOException) { // put files in openFiles synchronized (this) { @@ -375,7 +380,9 @@ public class FileManager { closeReaders(readers); // decrement the semaphore - filePermits.release(readers.size()); + if (!tablet.isMeta()) { + filePermits.release(readers.size()); + } } @@ -488,7 +495,7 @@ public class FileManager { + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " tablet = " + tablet); } - List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet.getTableId(), files, continueOnFailure); + List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet, files, continueOnFailure); tabletReservedReaders.addAll(newlyReservedReaders); return newlyReservedReaders; @@ -524,7 +531,7 @@ public class FileManager { synchronized void detach() { - releaseReaders(tabletReservedReaders, false); + releaseReaders(tablet, tabletReservedReaders, false); tabletReservedReaders.clear(); for (FileDataSource fds : dataSources) @@ -559,7 +566,7 @@ public class FileManager { } synchronized void releaseOpenFiles(boolean sawIOException) { - releaseReaders(tabletReservedReaders, sawIOException); + releaseReaders(tablet, tabletReservedReaders, sawIOException); tabletReservedReaders.clear(); dataSources.clear(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c85bf4f7/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java b/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java new file mode 100644 index 0000000..7bf88bc --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MetaGetsReadersIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.TSERV_SCAN_MAX_OPENFILES, "2"); + cfg.setProperty(Property.TABLE_BLOCKCACHE_ENABLED, "false"); + } + + private static Thread slowScan(final Connector c, final String tableName, final AtomicBoolean stop) { + Thread thread = new Thread() { + public void run() { + try { + while (stop.get() == false) { + Scanner s = c.createScanner(tableName, Authorizations.EMPTY); + IteratorSetting is = new IteratorSetting(50, SlowIterator.class); + SlowIterator.setSleepTime(is, 10); + s.addScanIterator(is); + Iterator<Entry<Key,Value>> iterator = s.iterator(); + while (iterator.hasNext() && stop.get() == false) { + iterator.next(); + } + } + } catch (Exception ex) { + log.trace(ex, ex); + stop.set(true); + } + } + }; + return thread; + } + + @Test(timeout = 2 * 60 * 1000) + public void test() throws Exception { + final String tableName = getUniqueNames(1)[0]; + final Connector c = getConnector(); + c.tableOperations().create(tableName); + Random random = new Random(); + BatchWriter bw = c.createBatchWriter(tableName, null); + for (int i = 0; i < 50000; i++) { + byte[] row = new byte[100]; + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + } + bw.close(); + c.tableOperations().flush(tableName, null, null, true); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread t1 = slowScan(c, tableName, stop); + t1.start(); + Thread t2 = slowScan(c, tableName, stop); + t2.start(); + UtilWaitThread.sleep(500); + long now = System.currentTimeMillis(); + Scanner m = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + for (@SuppressWarnings("unused") Entry<Key,Value> entry : m) { + } + long delay = System.currentTimeMillis() - now; + System.out.println("Delay = " + delay); + assertTrue("metadata table scan was slow", delay < 1000); + assertFalse(stop.get()); + stop.set(true); + t1.interrupt(); + t2.interrupt(); + t1.join(); + t2.join(); + } + +}