ACCUMULO-378 Lower the batchwriter "batch" size, and make it configurable.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3243d2ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3243d2ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3243d2ff

Branch: refs/heads/ACCUMULO-378
Commit: 3243d2ff9209246e7b03453460dfd4a3f231b190
Parents: da0a228
Author: Josh Elser <els...@apache.org>
Authored: Wed May 21 16:50:17 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed May 21 16:50:17 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/core/conf/Property.java    |  2 ++
 .../replication/BatchWriterReplicationReplayer.java     | 12 ++++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b1ee499..f239756 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -281,6 +281,8 @@ public enum Property {
   @Experimental
   TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", 
"org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
       PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer 
implementation"),
+  @Experimental
+  
TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory",
 "25M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay 
mutations for replication"),
 
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index 45c1409..ea50199 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -26,11 +26,15 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
 import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.slf4j.Logger;
@@ -45,8 +49,10 @@ public class BatchWriterReplicationReplayer implements 
AccumuloReplicationReplay
 
   @Override
   public long replicateLog(Connector conn, String tableName, WalEdits data) 
throws RemoteReplicationException {
+    final AccumuloConfiguration conf = 
ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
     final LogFileKey key = new LogFileKey();
     final LogFileValue value = new LogFileValue();
+    final long memoryInBytes = 
conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY);
 
     BatchWriter bw = null;
     long mutationsApplied = 0l;
@@ -63,14 +69,16 @@ public class BatchWriterReplicationReplayer implements 
AccumuloReplicationReplay
 
         // Create the batchScanner if we don't already have one.
         if (null == bw) {
+          BatchWriterConfig bwConfig = new BatchWriterConfig();
+          bwConfig.setMaxMemory(memoryInBytes);
           try {
-            bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+            bw = conn.createBatchWriter(tableName, bwConfig);
           } catch (TableNotFoundException e) {
             throw new 
RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(),
 "Table " + tableName + " does not exist");
           }
         }
 
-        log.info("Applying {} updates to table {} as part of batch", 
value.mutations.size(), tableName);
+        log.info("Applying {} mutations to table {} as part of batch", 
value.mutations.size(), tableName);
 
         try {
           bw.addMutations(value.mutations);

Reply via email to