ACCUMULO-2925 Add test to ensure that replicationSource(s) are preserved before replay
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/06760572 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/06760572 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/06760572 Branch: refs/heads/master Commit: 06760572e9325ba1f622a998085853a0098de783 Parents: cc7f91b Author: Josh Elser <els...@apache.org> Authored: Thu Jun 19 17:41:27 2014 -0700 Committer: Josh Elser <els...@apache.org> Committed: Thu Jun 19 17:41:27 2014 -0700 ---------------------------------------------------------------------- .../BatchWriterReplicationReplayerTest.java | 76 ++++++++++++++++++++ 1 file changed, 76 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/06760572/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java index 1e70490..fee5dcd 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java @@ -116,4 +116,80 @@ public class BatchWriterReplicationReplayerTest { verify(conn, conf, bw); } + @Test + public void replicationSourcesArePreserved() throws Exception { + final BatchWriterReplicationReplayer replayer = new BatchWriterReplicationReplayer(); + final String tableName = "foo"; + final long systemTimestamp = 1000; + final String peerName = "peer"; + final BatchWriterConfig bwCfg = new BatchWriterConfig(); + bwCfg.setMaxMemory(1l); + + Connector conn = createMock(Connector.class); + AccumuloConfiguration conf = createMock(AccumuloConfiguration.class); + BatchWriter bw = createMock(BatchWriter.class); + + LogFileKey key = new LogFileKey(); + key.event = LogEvents.MANY_MUTATIONS; + key.seq = 1; + key.tid = 1; + + WalEdits edits = new WalEdits(); + + // Make a mutation without timestamps + Mutation m = new Mutation("row"); + m.put("cf", "cq1", "value"); + m.put("cf", "cq2", "value"); + m.put("cf", "cq3", "value"); + m.put("cf", "cq4", "value"); + m.put("cf", "cq5", "value"); + + // This Mutation "came" from a system called "peer" + m.addReplicationSource(peerName); + + // Make it a TMutation + TMutation tMutation = m.toThrift(); + + // And then make a ServerMutation from the TMutation, adding in our systemTimestamp + ServerMutation sMutation = new ServerMutation(tMutation); + sMutation.setSystemTimestamp(systemTimestamp); + + // Serialize the ServerMutation (what AccumuloReplicaSystem will be doing) + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + + key.write(out); + out.writeInt(1); + sMutation.write(out); + + out.close(); + + // Add it to our "input" to BatchWriterReplicationReplayer + edits.addToEdits(ByteBuffer.wrap(baos.toByteArray())); + + Mutation expectedMutation = new Mutation("row"); + expectedMutation.put("cf", "cq1", sMutation.getSystemTimestamp(), "value"); + expectedMutation.put("cf", "cq2", sMutation.getSystemTimestamp(), "value"); + expectedMutation.put("cf", "cq3", sMutation.getSystemTimestamp(), "value"); + expectedMutation.put("cf", "cq4", sMutation.getSystemTimestamp(), "value"); + expectedMutation.put("cf", "cq5", sMutation.getSystemTimestamp(), "value"); + + // We expect our peer name to be preserved in the mutation that gets written + expectedMutation.addReplicationSource(peerName); + + expect(conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY)).andReturn(bwCfg.getMaxMemory()); + expect(conn.createBatchWriter(tableName, bwCfg)).andReturn(bw); + + bw.addMutations(Lists.newArrayList(expectedMutation)); + expectLastCall().once(); + + bw.close(); + expectLastCall().once(); + + replay(conn, conf, bw); + + replayer.replicateLog(conn, conf, tableName, edits); + + verify(conn, conf, bw); + } }