ACCUMULO-378 Lower the batchwriter "batch" size, and make it configurable.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3243d2ff Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3243d2ff Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3243d2ff Branch: refs/heads/ACCUMULO-378 Commit: 3243d2ff9209246e7b03453460dfd4a3f231b190 Parents: da0a228 Author: Josh Elser <els...@apache.org> Authored: Wed May 21 16:50:17 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed May 21 16:50:17 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/core/conf/Property.java | 2 ++ .../replication/BatchWriterReplicationReplayer.java | 12 ++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b1ee499..f239756 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -281,6 +281,8 @@ public enum Property { @Experimental TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer", PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"), + @Experimental + TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory", "25M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay mutations for replication"), // properties that are specific to logger server behavior LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java index 45c1409..ea50199 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java @@ -26,11 +26,15 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.replication.AccumuloReplicationReplayer; import org.apache.accumulo.core.replication.RemoteReplicationErrorCode; import org.apache.accumulo.core.replication.thrift.KeyValues; import org.apache.accumulo.core.replication.thrift.RemoteReplicationException; import org.apache.accumulo.core.replication.thrift.WalEdits; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; import org.slf4j.Logger; @@ -45,8 +49,10 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay @Override public long replicateLog(Connector conn, String tableName, WalEdits data) throws RemoteReplicationException { + final AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance()); final LogFileKey key = new LogFileKey(); final LogFileValue value = new LogFileValue(); + final long memoryInBytes = conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY); BatchWriter bw = null; long mutationsApplied = 0l; @@ -63,14 +69,16 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay // Create the batchScanner if we don't already have one. if (null == bw) { + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxMemory(memoryInBytes); try { - bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + bw = conn.createBatchWriter(tableName, bwConfig); } catch (TableNotFoundException e) { throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table " + tableName + " does not exist"); } } - log.info("Applying {} updates to table {} as part of batch", value.mutations.size(), tableName); + log.info("Applying {} mutations to table {} as part of batch", value.mutations.size(), tableName); try { bw.addMutations(value.mutations);