Updated Branches:
  refs/heads/ACCUMULO-1833-caching [created] 3b6eade61

ACCUMULO-1833 Rework the getBatchWriter method on MTBW to remove
zookeeper lock contention and get better concurrent throughput.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cba87980
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cba87980
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cba87980

Branch: refs/heads/ACCUMULO-1833-caching
Commit: cba87980cbd731338c58f05734ebb3d3e683b440
Parents: 060188a
Author: Josh Elser <josh.el...@gmail.com>
Authored: Thu Nov 7 16:49:41 2013 -0500
Committer: Josh Elser <josh.el...@gmail.com>
Committed: Thu Nov 7 16:49:41 2013 -0500

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +
 .../apache/accumulo/core/client/Connector.java  |  44 ++++++-
 .../core/client/impl/ConnectorImpl.java         |  12 ++
 .../client/impl/MultiTableBatchWriterImpl.java  | 116 ++++++++++++++-----
 .../core/client/mock/MockConnector.java         |  11 ++
 5 files changed, 159 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index f7539f5..d02a3cd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -30,6 +30,10 @@
       <artifactId>jcommander</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/Connector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java 
b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index d2e7321..68dc881 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.client;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.client.admin.SecurityOperations;
 import org.apache.accumulo.core.client.admin.TableOperations;
@@ -146,8 +148,32 @@ public abstract class Connector {
   public abstract MultiTableBatchWriter createMultiTableBatchWriter(long 
maxMemory, long maxLatency, int maxWriteThreads);
   
   /**
+   * Factory method to create a Multi-Table BatchWriter connected to Accumulo. 
Multi-table batch writers can queue data for multiple tables, which is good for
+   * ingesting data into multiple tables from the same source. Caching of 
ZooKeeper table information defaults to {@link 
MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME}
+   * and {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT}
+   * 
+   * @param maxMemory
+   *          size in bytes of the maximum memory to batch before writing
+   * @param maxLatency
+   *          size in milliseconds; set to 0 or Long.MAX_VALUE to allow the 
maximum time to hold a batch before writing
+   * @param maxWriteThreads
+   *          the maximum number of threads to use for writing data to the 
tablet servers
+   * @param cacheTime
+   *          Duration of time to cache ZooKeeper table information
+   * @param cacheTimeUnit
+   *          Unit of time to apply to {@link cacheTime}
+   * 
+   * @return MultiTableBatchWriter object for configuring and writing data to
+   * @deprecated since 1.5.0; Use {@link 
#createMultiTableBatchWriter(BatchWriterConfig)} instead.
+   * @since 1.5.1
+   */
+  @Deprecated
+  public abstract MultiTableBatchWriter createMultiTableBatchWriter(long 
maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit 
cacheTimeUnit);
+  
+  /**
    * Factory method to create a Multi-Table BatchWriter connected to Accumulo. 
Multi-table batch writers can queue data for multiple tables. Also data for
-   * multiple tables can be sent to a server in a single batch. Its an 
efficient way to ingest data into multiple tables from a single process.
+   * multiple tables can be sent to a server in a single batch. Its an 
efficient way to ingest data into multiple tables from a single process. Caching
+   * of ZooKeeper table information defaults to {@link 
MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME} and {@link 
MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT}
    * 
    * @param config
    *          configuration used to create multi-table batch writer
@@ -158,6 +184,22 @@ public abstract class Connector {
   public abstract MultiTableBatchWriter 
createMultiTableBatchWriter(BatchWriterConfig config);
   
   /**
+   * Factory method to create a Multi-Table BatchWriter connected to Accumulo. 
Multi-table batch writers can queue data for multiple tables. Also data for
+   * multiple tables can be sent to a server in a single batch. Its an 
efficient way to ingest data into multiple tables from a single process. This 
method
+   * also allows the user to provide parameters as to how long table 
information from ZooKeeper is cached.
+   * @param config
+   *          configuration used to create the multi-table batch writer
+   * @param cacheTime
+   *          Duration of time to cache ZooKeeper table information
+   * @param cacheTimeUnit
+   *          Unit of time to apply to {@link cacheTime}
+   * @return
+   * @since 1.5.1
+   */
+  public abstract MultiTableBatchWriter 
createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime, TimeUnit 
cacheTimeUnit);
+  
+  
+  /**
    * Factory method to create a Scanner connected to Accumulo.
    * 
    * @param tableName

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 1702082..89d2813 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -126,12 +126,24 @@ public class ConnectorImpl extends Connector {
         .setMaxLatency(maxLatency, 
TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
   }
   
+  @Deprecated
+  @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, 
long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit cacheTimeUnit) {
+    return new MultiTableBatchWriterImpl(instance, credentials, new 
BatchWriterConfig().setMaxMemory(maxMemory)
+        .setMaxLatency(maxLatency, 
TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads), cacheTime, 
cacheTimeUnit);
+  }
+  
   @Override
   public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig 
config) {
     return new MultiTableBatchWriterImpl(instance, credentials, config);
   }
   
   @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig 
config, long timeToCache, TimeUnit timeUnit) {
+    return new MultiTableBatchWriterImpl(instance, credentials, config, 
timeToCache, timeUnit);
+  }
+  
+  @Override
   public Scanner createScanner(String tableName, Authorizations 
authorizations) throws TableNotFoundException {
     ArgumentChecker.notNull(tableName, authorizations);
     return new ScannerImpl(instance, credentials, getTableId(tableName), 
authorizations);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
index 4537ae8..06b6f75 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
@@ -16,7 +16,9 @@
  */
 package org.apache.accumulo.core.client.impl;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -33,62 +35,97 @@ import 
org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.log4j.Logger;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
 public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
+  public static final long DEFAULT_CACHE_TIME = 60;
+  public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.SECONDS;
+  
   static final Logger log = Logger.getLogger(MultiTableBatchWriterImpl.class);
   private boolean closed;
-  
+
   private class TableBatchWriter implements BatchWriter {
-    
+
     private String table;
-    
+
     TableBatchWriter(String table) {
       this.table = table;
     }
-    
+
     @Override
     public void addMutation(Mutation m) throws MutationsRejectedException {
       ArgumentChecker.notNull(m);
       bw.addMutation(table, m);
     }
-    
+
     @Override
     public void addMutations(Iterable<Mutation> iterable) throws 
MutationsRejectedException {
       bw.addMutation(table, iterable.iterator());
     }
-    
+
     @Override
     public void close() {
       throw new UnsupportedOperationException("Must close all tables, can not 
close an individual table");
     }
-    
+
     @Override
     public void flush() {
       throw new UnsupportedOperationException("Must flush all tables, can not 
flush an individual table");
     }
-    
+
   }
-  
+
+  /**
+   * CacheLoader which will look up the internal table ID for a given table 
name.
+   */
+  private class TableNameToIdLoader extends CacheLoader<String,String> {
+
+    @Override
+    public String load(String tableName) throws Exception {
+      String tableId = Tables.getNameToIdMap(instance).get(tableName);
+
+      if (tableId == null)
+        throw new TableNotFoundException(tableId, tableName, null);
+
+      if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+        throw new TableOfflineException(instance, tableId);
+      
+      return tableId;
+    }
+
+  }
+
   private TabletServerBatchWriter bw;
-  private HashMap<String,BatchWriter> tableWriters;
+  private ConcurrentHashMap<String,BatchWriter> tableWriters;
   private Instance instance;
-  
+  private final LoadingCache<String,String> nameToIdCache;
+
   public MultiTableBatchWriterImpl(Instance instance, TCredentials 
credentials, BatchWriterConfig config) {
-    ArgumentChecker.notNull(instance, credentials);
+    this(instance, credentials, config, DEFAULT_CACHE_TIME, 
DEFAULT_CACHE_TIME_UNIT);
+  }
+  
+  public MultiTableBatchWriterImpl(Instance instance, TCredentials 
credentials, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) {
+    ArgumentChecker.notNull(instance, credentials, config, cacheTimeUnit);
     this.instance = instance;
     this.bw = new TabletServerBatchWriter(instance, credentials, config);
-    tableWriters = new HashMap<String,BatchWriter>();
+    tableWriters = new ConcurrentHashMap<String,BatchWriter>();
     this.closed = false;
+
+    nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, 
cacheTimeUnit).concurrencyLevel(8).maximumSize(64).initialCapacity(16)
+        .build(new TableNameToIdLoader());
   }
-  
+
   public boolean isClosed() {
     return this.closed;
   }
-  
+
   public void close() throws MutationsRejectedException {
     bw.close();
     this.closed = true;
   }
-  
+
   /**
    * Warning: do not rely upon finalize to close this class. Finalize is not 
guaranteed to be called.
    */
@@ -105,16 +142,41 @@ public class MultiTableBatchWriterImpl implements 
MultiTableBatchWriter {
     }
   }
   
+  /**
+   * Returns the table ID for the given table name.
+   * @param tableName The name of the table which to find the ID for
+   * @return The table ID, or null if the table name doesn't exist
+   */
+  private String getId(String tableName) throws TableNotFoundException {
+    try {
+      return nameToIdCache.get(tableName);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      
+      if (null == cause) {
+        throw new RuntimeException(e);
+      }
+      
+      if (cause instanceof TableNotFoundException) {
+        
+        throw (TableNotFoundException) cause;
+      }
+      
+      if (cause instanceof TableOfflineException) {
+        throw (TableOfflineException) cause;
+      }
+      
+      log.error("Unexpected exception when fetching table id for " + 
tableName);
+      
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
-  public synchronized BatchWriter getBatchWriter(String tableName) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException {
+  public BatchWriter getBatchWriter(String tableName) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException {
     ArgumentChecker.notNull(tableName);
-    String tableId = Tables.getNameToIdMap(instance).get(tableName);
-    if (tableId == null)
-      throw new TableNotFoundException(tableId, tableName, null);
-    
-    if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-      throw new TableOfflineException(instance, tableId);
-    
+    String tableId = getId(tableName);
+
     BatchWriter tbw = tableWriters.get(tableId);
     if (tbw == null) {
       tbw = new TableBatchWriter(tableId);
@@ -122,10 +184,10 @@ public class MultiTableBatchWriterImpl implements 
MultiTableBatchWriter {
     }
     return tbw;
   }
-  
+
   @Override
   public void flush() throws MutationsRejectedException {
     bw.flush();
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java 
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
index 1179559..2aa6291 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
@@ -90,12 +90,23 @@ public class MockConnector extends Connector {
     return new MockMultiTableBatchWriter(acu);
   }
   
+  @Deprecated
+  @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, 
long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit cacheTimeUnit) {
+    return new MockMultiTableBatchWriter(acu);
+  }
+  
   @Override
   public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig 
config) {
     return createMultiTableBatchWriter(config.getMaxMemory(), 
config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads());
   }
   
   @Override
+  public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig 
config, long cacheTime, TimeUnit cacheTimeUnit) {
+    return createMultiTableBatchWriter(config.getMaxMemory(), 
config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads(), 
cacheTime, cacheTimeUnit);
+  }
+  
+  @Override
   public Scanner createScanner(String tableName, Authorizations 
authorizations) throws TableNotFoundException {
     MockTable table = acu.tables.get(tableName);
     if (table == null)

Reply via email to