ACCUMULO-2925 Need to preserve replicationSource on the Mutation The replicationSource on the Mutation is the information which prevents cycles in the replication graph from infinitely replicating information. Each replicationSource on a Mutation is the `replication.name` for a system from which that Mutation came.
We can later use this set to determine if we need to replicate this Mutation to a given peer by observing if the `replication.name` of our peer already exists in the replicationSources. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/03c93c9d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/03c93c9d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/03c93c9d Branch: refs/heads/master Commit: 03c93c9dd74abadad027cec6a934c92fd1d58f8c Parents: b062a0b Author: Josh Elser <els...@apache.org> Authored: Thu Jun 19 14:18:19 2014 -0700 Committer: Josh Elser <els...@apache.org> Committed: Thu Jun 19 14:18:19 2014 -0700 ---------------------------------------------------------------------- .../tserver/replication/BatchWriterReplicationReplayer.java | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/03c93c9d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java index 1d2a529..e9207fb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -114,6 +115,14 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay } } + // We also need to preserve the replicationSource information to prevent cycles + Set<String> replicationSources = orig.getReplicationSources(); + if (null != replicationSources && !replicationSources.isEmpty()) { + for (String replicationSource : replicationSources) { + copy.addReplicationSource(replicationSource); + } + } + mutationsCopy.add(copy); } else { mutationsCopy.add(orig);