Repository: accumulo
Updated Branches:
  refs/heads/1.6 cb262120e -> c85bf4f7d


ACCUMULO-3297 allow metadata table reads even if all files have been allocated


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c85bf4f7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c85bf4f7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c85bf4f7

Branch: refs/heads/1.6
Commit: c85bf4f7d99c9edac5d64db26ca1f7a89d2b80e8
Parents: cb26212
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Thu Nov 6 13:48:30 2014 -0500
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Thu Nov 6 13:48:47 2014 -0500

----------------------------------------------------------------------
 .../apache/accumulo/tserver/FileManager.java    |  35 +++---
 .../apache/accumulo/test/MetaGetsReadersIT.java | 114 +++++++++++++++++++
 2 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c85bf4f7/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index b82b9cc..1735aec 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 public class FileManager {
@@ -204,7 +203,7 @@ public class FileManager {
     
     ArrayList<FileSKVIterator> ret = new ArrayList<FileSKVIterator>();
     
-    for (int i = 0; i < numToTake; i++) {
+    for (int i = 0; i < numToTake && i < openReaders.size(); i++) {
       OpenReader or = openReaders.get(i);
       
       List<OpenReader> ofl = openFiles.get(or.fileName);
@@ -266,9 +265,9 @@ public class FileManager {
     return reservedReaders.get(reader);
   }
   
-  private List<FileSKVIterator> reserveReaders(Text table, Collection<String> 
files, boolean continueOnFailure) throws IOException {
+  private List<FileSKVIterator> reserveReaders(KeyExtent tablet, 
Collection<String> files, boolean continueOnFailure) throws IOException {
     
-    if (files.size() >= maxOpen) {
+    if (!tablet.isMeta() && files.size() >= maxOpen) {
       throw new IllegalArgumentException("requested files exceeds max open");
     }
     
@@ -281,7 +280,9 @@ public class FileManager {
     List<FileSKVIterator> reservedFiles = new ArrayList<FileSKVIterator>();
     Map<FileSKVIterator,String> readersReserved = new 
HashMap<FileSKVIterator,String>();
     
-    filePermits.acquireUninterruptibly(files.size());
+    if (!tablet.isMeta()) {
+      filePermits.acquireUninterruptibly(files.size());
+    }
     
     // now that the we are past the semaphore, we have the authority
     // to open files.size() files
@@ -312,23 +313,27 @@ public class FileManager {
         Path path = new Path(file);
         FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
         //log.debug("Opening "+file + " path " + path);
-        FileSKVIterator reader = 
FileOperations.getInstance().openReader(path.toString(), false, ns, 
ns.getConf(), conf.getTableConfiguration(table.toString()),
+        FileSKVIterator reader = 
FileOperations.getInstance().openReader(path.toString(), false, ns, 
ns.getConf(), conf.getTableConfiguration(tablet),
             dataCache, indexCache);
         reservedFiles.add(reader);
         readersReserved.put(reader, file);
       } catch (Exception e) {
         
-        ProblemReports.getInstance().report(new 
ProblemReport(table.toString(), ProblemType.FILE_READ, file, e));
+        ProblemReports.getInstance().report(new 
ProblemReport(tablet.toString(), ProblemType.FILE_READ, file, e));
         
         if (continueOnFailure) {
           // release the permit for the file that failed to open
-          filePermits.release(1);
+          if (!tablet.isMeta()) {
+            filePermits.release(1);
+          }
           log.warn("Failed to open file " + file + " " + e.getMessage() + " 
continuing...");
         } else {
           // close whatever files were opened
           closeReaders(reservedFiles);
           
-          filePermits.release(files.size());
+          if (!tablet.isMeta()) {
+            filePermits.release(files.size());
+          }
           
           log.error("Failed to open file " + file + " " + e.getMessage());
           throw new IOException("Failed to open " + file, e);
@@ -344,7 +349,7 @@ public class FileManager {
     return reservedFiles;
   }
   
-  private void releaseReaders(List<FileSKVIterator> readers, boolean 
sawIOException) {
+  private void releaseReaders(KeyExtent tablet, List<FileSKVIterator> readers, 
boolean sawIOException) {
     // put files in openFiles
     
     synchronized (this) {
@@ -375,7 +380,9 @@ public class FileManager {
       closeReaders(readers);
     
     // decrement the semaphore
-    filePermits.release(readers.size());
+    if (!tablet.isMeta()) {
+      filePermits.release(readers.size());
+    }
     
   }
   
@@ -488,7 +495,7 @@ public class FileManager {
             + " files.size()=" + files.size() + " maxOpen=" + maxOpen + " 
tablet = " + tablet);
       }
       
-      List<FileSKVIterator> newlyReservedReaders = 
reserveReaders(tablet.getTableId(), files, continueOnFailure);
+      List<FileSKVIterator> newlyReservedReaders = reserveReaders(tablet, 
files, continueOnFailure);
       
       tabletReservedReaders.addAll(newlyReservedReaders);
       return newlyReservedReaders;
@@ -524,7 +531,7 @@ public class FileManager {
     
     synchronized void detach() {
       
-      releaseReaders(tabletReservedReaders, false);
+      releaseReaders(tablet, tabletReservedReaders, false);
       tabletReservedReaders.clear();
       
       for (FileDataSource fds : dataSources)
@@ -559,7 +566,7 @@ public class FileManager {
     }
     
     synchronized void releaseOpenFiles(boolean sawIOException) {
-      releaseReaders(tabletReservedReaders, sawIOException);
+      releaseReaders(tablet, tabletReservedReaders, sawIOException);
       tabletReservedReaders.clear();
       dataSources.clear();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c85bf4f7/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java 
b/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java
new file mode 100644
index 0000000..7bf88bc
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/MetaGetsReadersIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MetaGetsReadersIT extends ConfigurableMacIT {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.TSERV_SCAN_MAX_OPENFILES, "2");
+    cfg.setProperty(Property.TABLE_BLOCKCACHE_ENABLED, "false");
+  }
+  
+  private static Thread slowScan(final Connector c, final String tableName, 
final AtomicBoolean stop) {
+    Thread thread = new Thread() {
+      public void run() {
+        try {
+          while (stop.get() == false) {
+            Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
+            IteratorSetting is = new IteratorSetting(50, SlowIterator.class);
+            SlowIterator.setSleepTime(is, 10);
+            s.addScanIterator(is);
+            Iterator<Entry<Key,Value>> iterator = s.iterator();
+            while (iterator.hasNext() && stop.get() == false) {
+              iterator.next();
+            }
+          }
+        } catch (Exception ex) {
+          log.trace(ex, ex);
+          stop.set(true);
+        }
+      }
+    };
+    return thread;
+  }
+  
+  @Test(timeout = 2 * 60 * 1000)
+  public void test() throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final Connector c = getConnector();
+    c.tableOperations().create(tableName);
+    Random random = new Random();
+    BatchWriter bw = c.createBatchWriter(tableName, null);
+    for (int i = 0; i < 50000; i++) {
+      byte[] row = new byte[100];
+      random.nextBytes(row);
+      Mutation m = new Mutation(row);
+      m.put("", "", "");
+      bw.addMutation(m);
+    }
+    bw.close();
+    c.tableOperations().flush(tableName, null, null, true);
+    final AtomicBoolean stop = new AtomicBoolean(false);
+    Thread t1 = slowScan(c, tableName, stop);
+    t1.start();
+    Thread t2 = slowScan(c, tableName, stop);
+    t2.start();
+    UtilWaitThread.sleep(500);
+    long now = System.currentTimeMillis();
+    Scanner m = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry : m) {
+    }
+    long delay = System.currentTimeMillis() - now;
+    System.out.println("Delay = " + delay);
+    assertTrue("metadata table scan was slow", delay < 1000);
+    assertFalse(stop.get());
+    stop.set(true);
+    t1.interrupt();
+    t2.interrupt();
+    t1.join();
+    t2.join();
+  }
+
+}

Reply via email to