ACCUMULO-378 When re-syncing to where we left off on reads, we need to track all tids for our table
Fixes an issue where when the DEFINE_TABLET wasn't contained in the batch of log entries that we were reading, we ignored all of the mutations.When we read past all of the old data, we still need to track the tids for the table which we're replicating. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/59177233 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/59177233 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/59177233 Branch: refs/heads/ACCUMULO-378 Commit: 59177233fd903d5c69592c67e603c58bc2a0ed2a Parents: abea3c6 Author: Josh Elser <els...@apache.org> Authored: Thu May 22 13:37:14 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 22 13:37:14 2014 -0400 ---------------------------------------------------------------------- .../replication/AccumuloReplicaSystem.java | 43 +++++++--- .../replication/AccumuloReplicaSystemTest.java | 82 +++++++++++++++++++- 2 files changed, 109 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/59177233/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 6cd3358..ca1382f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -194,7 +194,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { return kvs; } } else { - WalReplication edits = getWalEdits(target, p, status, sizeLimit); + WalReplication edits = getWalEdits(target, getWalStream(p), p, status, sizeLimit); // If we have some edits to send if (0 < edits.walEdits.getEditsSize()) { @@ -206,8 +206,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem { // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication) return edits; - } else if (edits.entriesConsumed == Long.MAX_VALUE) { - // Even if we send no data, we must record the new begin value to account for the inf+ length + } else if (edits.entriesConsumed > 0) { + // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same + // log entries multiple times to determine if they should be sent return edits; } } @@ -249,13 +250,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem { throw new UnsupportedOperationException(); } - protected WalReplication getWalEdits(ReplicationTarget target, Path p, Status status, long sizeLimit) throws IOException { - DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf); - DataInputStream wal = streams.getDecryptingInputStream(); + protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException { LogFileKey key = new LogFileKey(); LogFileValue value = new LogFileValue(); + Set<Integer> desiredTids = new HashSet<>(); + // Read through the stuff we've already processed in a previous replication attempt + // We also need to track the tids that occurred earlier in the file as mutations + // later on might use that tid for (long i = 0; i < status.getBegin(); i++) { try { key.readFields(wal); @@ -264,9 +267,19 @@ public class AccumuloReplicaSystem implements ReplicaSystem { log.warn("Unexpectedly reached the end of file."); return new WalReplication(new WalEdits(), 0, 0, 0); } + + switch (key.event) { + case DEFINE_TABLET: + if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) { + desiredTids.add(key.tid); + } + break; + default: + break; + } } - WalReplication repl = getEdits(wal, sizeLimit, target, status, p); + WalReplication repl = getEdits(wal, sizeLimit, target, status, p, desiredTids); log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == repl.entriesConsumed) ? "all" : repl.entriesConsumed, repl.sizeInBytes, p); @@ -274,7 +287,12 @@ public class AccumuloReplicaSystem implements ReplicaSystem { return repl; } - protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p) throws IOException { + protected DataInputStream getWalStream(Path p) throws IOException { + DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf); + return streams.getDecryptingInputStream(); + } + + protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p, Set<Integer> desiredTids) throws IOException { WalEdits edits = new WalEdits(); edits.edits = new ArrayList<ByteBuffer>(); long size = 0l; @@ -283,9 +301,6 @@ public class AccumuloReplicaSystem implements ReplicaSystem { LogFileKey key = new LogFileKey(); LogFileValue value = new LogFileValue(); - // Any tid for our table needs to be tracked - Set<Integer> desiredTids = new HashSet<>(); - while (size < sizeLimit) { try { key.readFields(wal); @@ -303,6 +318,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { switch (key.event) { case DEFINE_TABLET: + // For new DEFINE_TABLETs, we also need to record the new tids we see if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) { desiredTids.add(key.tid); } @@ -349,7 +365,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } } - log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", value.mutations.size() - mutationsToSend, target.getPeerName()); + int mutationsRemoved = value.mutations.size() - mutationsToSend; + if (mutationsRemoved > 0) { + log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", mutationsRemoved, target.getPeerName()); + } out.writeInt(mutationsToSend); for (Mutation m : value.mutations) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/59177233/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java index 07d1201..85204e3 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.UUID; @@ -146,7 +147,7 @@ public class AccumuloReplicaSystemTest { Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build(); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); - WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal")); + WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>()); // We stopped because we got to the end of the file Assert.assertEquals(9, repl.entriesConsumed); @@ -253,7 +254,7 @@ public class AccumuloReplicaSystemTest { // If it were still open, more data could be appended that we need to process Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); - WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal")); + WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>()); // We stopped because we got to the end of the file Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed); @@ -318,7 +319,7 @@ public class AccumuloReplicaSystemTest { // If it were still open, more data could be appended that we need to process Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build(); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0])); - WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal")); + WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>()); // We stopped because we got to the end of the file Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed); @@ -340,7 +341,7 @@ public class AccumuloReplicaSystemTest { // If it were still open, more data could be appended that we need to process Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(false).build(); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0])); - WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal")); + WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>()); // We stopped because we got to the end of the file Assert.assertEquals(0, repl.entriesConsumed); @@ -348,4 +349,77 @@ public class AccumuloReplicaSystemTest { Assert.assertEquals(0, repl.sizeInRecords); Assert.assertEquals(0, repl.sizeInBytes); } + + @Test + public void restartInFileKnowsAboutPreviousTableDefines() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + + // What is seq used for? + key.seq = 1l; + + /* + * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing whatsoever in reality regarding what these entries would actually + * look like in a WAL. They are solely for testing that each LogEvents is handled, order is not important. + */ + key.event = LogEvents.DEFINE_TABLET; + key.tablet = new KeyExtent(new Text("1"), null, null); + key.tid = 1; + + key.write(dos); + value.write(dos); + + key.tablet = null; + key.event = LogEvents.MUTATION; + key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row"))); + + key.write(dos); + value.write(dos); + + key.tablet = null; + key.event = LogEvents.MUTATION; + key.tid = 1; + key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row"))); + + key.write(dos); + value.write(dos); + + dos.close(); + + Map<String,String> confMap = new HashMap<>(); + confMap.put(Property.REPLICATION_NAME.getKey(), "source"); + AccumuloConfiguration conf = new ConfigurationCopy(confMap); + + AccumuloReplicaSystem ars = new AccumuloReplicaSystem(); + ars.setConf(conf); + + Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build(); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + + // Only consume the first mutation, not the second + WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1); + + // We stopped because we got to the end of the file + Assert.assertEquals(2, repl.entriesConsumed); + Assert.assertEquals(1, repl.walEdits.getEditsSize()); + Assert.assertEquals(1, repl.sizeInRecords); + Assert.assertNotEquals(0, repl.sizeInBytes); + + status = Status.newBuilder(status).setBegin(2).build(); + dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + + // Consume the rest of the mutations + repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1); + + // We stopped because we got to the end of the file + Assert.assertEquals(1, repl.entriesConsumed); + Assert.assertEquals(1, repl.walEdits.getEditsSize()); + Assert.assertEquals(1, repl.sizeInRecords); + Assert.assertNotEquals(0, repl.sizeInBytes); + } }