Updated Branches: refs/heads/ACCUMULO-1833-caching [created] 3b6eade61
ACCUMULO-1833 Rework the getBatchWriter method on MTBW to remove zookeeper lock contention and get better concurrent throughput. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cba87980 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cba87980 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cba87980 Branch: refs/heads/ACCUMULO-1833-caching Commit: cba87980cbd731338c58f05734ebb3d3e683b440 Parents: 060188a Author: Josh Elser <josh.el...@gmail.com> Authored: Thu Nov 7 16:49:41 2013 -0500 Committer: Josh Elser <josh.el...@gmail.com> Committed: Thu Nov 7 16:49:41 2013 -0500 ---------------------------------------------------------------------- core/pom.xml | 4 + .../apache/accumulo/core/client/Connector.java | 44 ++++++- .../core/client/impl/ConnectorImpl.java | 12 ++ .../client/impl/MultiTableBatchWriterImpl.java | 116 ++++++++++++++----- .../core/client/mock/MockConnector.java | 11 ++ 5 files changed, 159 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index f7539f5..d02a3cd 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -30,6 +30,10 @@ <artifactId>jcommander</artifactId> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> <groupId>jline</groupId> <artifactId>jline</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/Connector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java index d2e7321..68dc881 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.core.client; +import java.util.concurrent.TimeUnit; + import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; @@ -146,8 +148,32 @@ public abstract class Connector { public abstract MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads); /** + * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for + * ingesting data into multiple tables from the same source. Caching of ZooKeeper table information defaults to {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME} + * and {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT} + * + * @param maxMemory + * size in bytes of the maximum memory to batch before writing + * @param maxLatency + * size in milliseconds; set to 0 or Long.MAX_VALUE to allow the maximum time to hold a batch before writing + * @param maxWriteThreads + * the maximum number of threads to use for writing data to the tablet servers + * @param cacheTime + * Duration of time to cache ZooKeeper table information + * @param cacheTimeUnit + * Unit of time to apply to {@link cacheTime} + * + * @return MultiTableBatchWriter object for configuring and writing data to + * @deprecated since 1.5.0; Use {@link #createMultiTableBatchWriter(BatchWriterConfig)} instead. + * @since 1.5.1 + */ + @Deprecated + public abstract MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit cacheTimeUnit); + + /** * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables. Also data for - * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process. + * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process. Caching + * of ZooKeeper table information defaults to {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME} and {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT} * * @param config * configuration used to create multi-table batch writer @@ -158,6 +184,22 @@ public abstract class Connector { public abstract MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config); /** + * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables. Also data for + * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process. This method + * also allows the user to provide parameters as to how long table information from ZooKeeper is cached. + * @param config + * configuration used to create the multi-table batch writer + * @param cacheTime + * Duration of time to cache ZooKeeper table information + * @param cacheTimeUnit + * Unit of time to apply to {@link cacheTime} + * @return + * @since 1.5.1 + */ + public abstract MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit); + + + /** * Factory method to create a Scanner connected to Accumulo. * * @param tableName http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java index 1702082..89d2813 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java @@ -126,12 +126,24 @@ public class ConnectorImpl extends Connector { .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); } + @Deprecated + @Override + public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit cacheTimeUnit) { + return new MultiTableBatchWriterImpl(instance, credentials, new BatchWriterConfig().setMaxMemory(maxMemory) + .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads), cacheTime, cacheTimeUnit); + } + @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { return new MultiTableBatchWriterImpl(instance, credentials, config); } @Override + public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config, long timeToCache, TimeUnit timeUnit) { + return new MultiTableBatchWriterImpl(instance, credentials, config, timeToCache, timeUnit); + } + + @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { ArgumentChecker.notNull(tableName, authorizations); return new ScannerImpl(instance, credentials, getTableId(tableName), authorizations); http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java index 4537ae8..06b6f75 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java @@ -16,7 +16,9 @@ */ package org.apache.accumulo.core.client.impl; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -33,62 +35,97 @@ import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.log4j.Logger; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { + public static final long DEFAULT_CACHE_TIME = 60; + public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.SECONDS; + static final Logger log = Logger.getLogger(MultiTableBatchWriterImpl.class); private boolean closed; - + private class TableBatchWriter implements BatchWriter { - + private String table; - + TableBatchWriter(String table) { this.table = table; } - + @Override public void addMutation(Mutation m) throws MutationsRejectedException { ArgumentChecker.notNull(m); bw.addMutation(table, m); } - + @Override public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException { bw.addMutation(table, iterable.iterator()); } - + @Override public void close() { throw new UnsupportedOperationException("Must close all tables, can not close an individual table"); } - + @Override public void flush() { throw new UnsupportedOperationException("Must flush all tables, can not flush an individual table"); } - + } - + + /** + * CacheLoader which will look up the internal table ID for a given table name. + */ + private class TableNameToIdLoader extends CacheLoader<String,String> { + + @Override + public String load(String tableName) throws Exception { + String tableId = Tables.getNameToIdMap(instance).get(tableName); + + if (tableId == null) + throw new TableNotFoundException(tableId, tableName, null); + + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); + + return tableId; + } + + } + private TabletServerBatchWriter bw; - private HashMap<String,BatchWriter> tableWriters; + private ConcurrentHashMap<String,BatchWriter> tableWriters; private Instance instance; - + private final LoadingCache<String,String> nameToIdCache; + public MultiTableBatchWriterImpl(Instance instance, TCredentials credentials, BatchWriterConfig config) { - ArgumentChecker.notNull(instance, credentials); + this(instance, credentials, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT); + } + + public MultiTableBatchWriterImpl(Instance instance, TCredentials credentials, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { + ArgumentChecker.notNull(instance, credentials, config, cacheTimeUnit); this.instance = instance; this.bw = new TabletServerBatchWriter(instance, credentials, config); - tableWriters = new HashMap<String,BatchWriter>(); + tableWriters = new ConcurrentHashMap<String,BatchWriter>(); this.closed = false; + + nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(8).maximumSize(64).initialCapacity(16) + .build(new TableNameToIdLoader()); } - + public boolean isClosed() { return this.closed; } - + public void close() throws MutationsRejectedException { bw.close(); this.closed = true; } - + /** * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called. */ @@ -105,16 +142,41 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { } } + /** + * Returns the table ID for the given table name. + * @param tableName The name of the table which to find the ID for + * @return The table ID, or null if the table name doesn't exist + */ + private String getId(String tableName) throws TableNotFoundException { + try { + return nameToIdCache.get(tableName); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + + if (null == cause) { + throw new RuntimeException(e); + } + + if (cause instanceof TableNotFoundException) { + + throw (TableNotFoundException) cause; + } + + if (cause instanceof TableOfflineException) { + throw (TableOfflineException) cause; + } + + log.error("Unexpected exception when fetching table id for " + tableName); + + throw new RuntimeException(e); + } + } + @Override - public synchronized BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { ArgumentChecker.notNull(tableName); - String tableId = Tables.getNameToIdMap(instance).get(tableName); - if (tableId == null) - throw new TableNotFoundException(tableId, tableName, null); - - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - + String tableId = getId(tableName); + BatchWriter tbw = tableWriters.get(tableId); if (tbw == null) { tbw = new TableBatchWriter(tableId); @@ -122,10 +184,10 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { } return tbw; } - + @Override public void flush() throws MutationsRejectedException { bw.flush(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java index 1179559..2aa6291 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java @@ -90,12 +90,23 @@ public class MockConnector extends Connector { return new MockMultiTableBatchWriter(acu); } + @Deprecated + @Override + public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit cacheTimeUnit) { + return new MockMultiTableBatchWriter(acu); + } + @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { return createMultiTableBatchWriter(config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } @Override + public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { + return createMultiTableBatchWriter(config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads(), cacheTime, cacheTimeUnit); + } + + @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { MockTable table = acu.tables.get(tableName); if (table == null)