ACCUMULO-2709 Don't create the work entry unless there's work to be done We can save on writing on some mutations by checking if the status record needs replication. The WorkAssigner already had a check for this down stream, but we can be a bit more efficient having it earlier on in the pipeline.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b5cd35a6 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b5cd35a6 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b5cd35a6 Branch: refs/heads/ACCUMULO-378 Commit: b5cd35a6d683c5ff315d5f6ab7f1879d798c1951 Parents: 3579b67 Author: Josh Elser <els...@apache.org> Authored: Thu May 8 12:19:37 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 8 12:19:37 2014 -0400 ---------------------------------------------------------------------- .../accumulo/master/replication/WorkMaker.java | 23 +++++++- .../master/replication/WorkMakerTest.java | 61 +++++++++++++++++--- 2 files changed, 74 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/b5cd35a6/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java index cfc756c..956d550 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java @@ -34,6 +34,8 @@ import org.apache.accumulo.core.replication.ReplicationSchema; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.replication.ReplicationTable; @@ -41,13 +43,16 @@ import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.InvalidProtocolBufferException; /** * Reads replication records from the replication table and creates work records which include target replication system information. */ public class WorkMaker { - private static final Logger log = Logger.getLogger(WorkMaker.class); + private static final Logger log = LoggerFactory.getLogger(WorkMaker.class); private final Connector conn; @@ -88,6 +93,20 @@ public class WorkMaker { ReplicationSchema.StatusSection.getFile(entry.getKey(), file); ReplicationSchema.StatusSection.getTableId(entry.getKey(), tableId); log.info("Processing replication status record for " + file + " on table "+ tableId); + + Status status; + try { + status = Status.parseFrom(entry.getValue().get()); + } catch (InvalidProtocolBufferException e) { + log.error("Could not parse protobuf for {} from table {}", file, tableId); + continue; + } + + // Don't create the record if we have nothing to do + // TODO put this into a filter on serverside + if (!StatusUtil.isWorkRequired(status)) { + continue; + } // Get the table configuration for the table specified by the status record tableConf = ServerConfiguration.getTableConfiguration(conn.getInstance(), tableId.toString()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b5cd35a6/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java index bebab51..34bb567 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java @@ -28,16 +28,16 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.replication.ReplicationTable; -import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.Assert; @@ -47,6 +47,7 @@ import org.junit.Test; import org.junit.rules.TestName; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; /** * @@ -82,19 +83,24 @@ public class WorkMakerTest { String tableId = conn.tableOperations().tableIdMap().get(table); String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678"; - KeyExtent extent = new KeyExtent(new Text(tableId), null, null); - Mutation m = ReplicationTableUtil.createUpdateMutation(new Path(file), StatusUtil.fileClosedValue(), extent); + Mutation m = new Mutation(new Path(file).toString()); + m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue()); BatchWriter bw = ReplicationTable.getBatchWriter(conn); bw.addMutation(m); bw.flush(); + // Assert that we have one record in the status section + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Assert.assertEquals(1, Iterables.size(s)); + WorkMaker workMaker = new WorkMaker(conn); ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4"); workMaker.setBatchWriter(bw); workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(), ImmutableMap.of("remote_cluster_1", "4")); - Scanner s = ReplicationTable.getScanner(conn); + s = ReplicationTable.getScanner(conn); WorkSection.limit(s); Iterator<Entry<Key,Value>> iter = s.iterator(); @@ -119,12 +125,17 @@ public class WorkMakerTest { String tableId = conn.tableOperations().tableIdMap().get(table); String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678"; - KeyExtent extent = new KeyExtent(new Text(tableId), null, null); - Mutation m = ReplicationTableUtil.createUpdateMutation(new Path(file), StatusUtil.fileClosedValue(), extent); + Mutation m = new Mutation(new Path(file).toString()); + m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue()); BatchWriter bw = ReplicationTable.getBatchWriter(conn); bw.addMutation(m); bw.flush(); + // Assert that we have one record in the status section + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Assert.assertEquals(1, Iterables.size(s)); + WorkMaker workMaker = new WorkMaker(conn); Map<String,String> targetClusters = ImmutableMap.of("remote_cluster_1", "4", "remote_cluster_2", "6", "remote_cluster_3", "8"); @@ -135,7 +146,7 @@ public class WorkMakerTest { workMaker.setBatchWriter(bw); workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(), targetClusters); - Scanner s = ReplicationTable.getScanner(conn); + s = ReplicationTable.getScanner(conn); WorkSection.limit(s); Set<ReplicationTarget> actualTargets = new HashSet<>(); @@ -154,4 +165,38 @@ public class WorkMakerTest { Assert.assertTrue("Found extra replication work entries: " + actualTargets, actualTargets.isEmpty()); } + + @Test + public void dontCreateWorkForEntriesWithNothingToReplicate() throws Exception { + String table = name.getMethodName(); + conn.tableOperations().create(name.getMethodName()); + String tableId = conn.tableOperations().tableIdMap().get(table); + String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678"; + + Mutation m = new Mutation(new Path(file).toString()); + m.put(StatusSection.NAME, new Text(tableId), StatusUtil.newFileValue()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + bw.addMutation(m); + bw.flush(); + + // Assert that we have one record in the status section + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Assert.assertEquals(1, Iterables.size(s)); + + WorkMaker workMaker = new WorkMaker(conn); + + conn.tableOperations().setProperty(ReplicationTable.NAME, Property.TABLE_REPLICATION_TARGETS.getKey() + "remote_cluster_1", "4"); + + workMaker.setBatchWriter(bw); + + // If we don't shortcircuit out, we should get an exception because ServerConfiguration.getTableConfiguration + // won't work with MockAccumulo + workMaker.run(); + + s = ReplicationTable.getScanner(conn); + WorkSection.limit(s); + + Assert.assertEquals(0, Iterables.size(s)); + } }