ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f6b6d0f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f6b6d0f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f6b6d0f Branch: refs/heads/ACCUMULO-378 Commit: 0f6b6d0fb091440ad4780db1cf3c11f4a606de59 Parents: 327b0ab Author: Josh Elser <els...@apache.org> Authored: Thu May 22 18:33:11 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 22 18:33:11 2014 -0400 ---------------------------------------------------------------------- .../replication/ReplicationProcessor.java | 56 ++++++++++++-------- .../replication/ReplicationProcessorTest.java | 49 +++++++++++++++++ 2 files changed, 84 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java index d451991..ab939c5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java @@ -119,27 +119,9 @@ public class ReplicationProcessor implements Processor { } log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName()); - - // Replicate that sucker - Status replicatedStatus = replica.replicate(filePath, status, target); - - log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus)); - - // If we got a different status - if (!replicatedStatus.equals(status)) { - // We actually did some work! - recordNewStatus(filePath, replicatedStatus, target); - return; - } - - log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status), - TextFormat.shortDebugString(replicatedStatus)); - - // otherwise, we didn't actually replicate because there was error sending the data - // we can just not record any updates, and it will be picked up again by the work assigner } - public String getPeerType(String peerName) { + protected String getPeerType(String peerName) { // Find the configured replication peer so we know how to replicate to it Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS); String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName); @@ -152,7 +134,7 @@ public class ReplicationProcessor implements Processor { return peerType; } - public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, + protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, InvalidProtocolBufferException { Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken())); s.setRange(Range.exact(file)); @@ -161,6 +143,38 @@ public class ReplicationProcessor implements Processor { return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get()); } + protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) { + Status lastStatus = status; + while (true) { + // Replicate that sucker + Status replicatedStatus = replica.replicate(filePath, status, target); + + log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus)); + + // If we got a different status + if (!replicatedStatus.equals(lastStatus)) { + // We actually did some work! + recordNewStatus(filePath, replicatedStatus, target); + + // If we don't have any more work, just quit + if (!StatusUtil.isWorkRequired(replicatedStatus)) { + return; + } else { + // Otherwise, let it loop and replicate some more data + lastStatus = status; + status = replicatedStatus; + } + } else { + log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status), + TextFormat.shortDebugString(replicatedStatus)); + + // otherwise, we didn't actually replicate because there was error sending the data + // we can just not record any updates, and it will be picked up again by the work assigner + return; + } + } + } + /** * Record the updated Status for this file and target * @@ -171,7 +185,7 @@ public class ReplicationProcessor implements Processor { * @param target * Peer that was replicated to */ - public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) { + protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) { try { Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); BatchWriter bw = ReplicationTable.getBatchWriter(conn); http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java index df4845c..c88e091 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java @@ -20,11 +20,15 @@ import java.util.HashMap; import java.util.Map; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.replication.ReplicaSystem; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.Path; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -65,4 +69,49 @@ public class ReplicationProcessorTest { proc.getPeerType("foo"); } + + @Test + public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception { + ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class); + ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock(); + + ReplicationTarget target = new ReplicationTarget("peer", "1", "1"); + Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); + Path path = new Path("/accumulo"); + + Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); + Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); + + EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus); + proc.recordNewStatus(path, firstStatus, target); + EasyMock.expectLastCall().once(); + + EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus); + proc.recordNewStatus(path, secondStatus, target); + EasyMock.expectLastCall().once(); + + EasyMock.replay(replica, proc); + + proc.replicate(replica, path, status, target); + + EasyMock.verify(replica, proc); + } + + @Test + public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception { + ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class); + ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock(); + + ReplicationTarget target = new ReplicationTarget("peer", "1", "1"); + Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); + Path path = new Path("/accumulo"); + + EasyMock.expect(replica.replicate(path, status, target)).andReturn(status); + + EasyMock.replay(replica, proc); + + proc.replicate(replica, path, status, target); + + EasyMock.verify(replica, proc); + } }