ACCUMULO-2925 Add test to ensure that replicationSource(s) are preserved before 
replay


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/06760572
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/06760572
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/06760572

Branch: refs/heads/master
Commit: 06760572e9325ba1f622a998085853a0098de783
Parents: cc7f91b
Author: Josh Elser <els...@apache.org>
Authored: Thu Jun 19 17:41:27 2014 -0700
Committer: Josh Elser <els...@apache.org>
Committed: Thu Jun 19 17:41:27 2014 -0700

----------------------------------------------------------------------
 .../BatchWriterReplicationReplayerTest.java     | 76 ++++++++++++++++++++
 1 file changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/06760572/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
index 1e70490..fee5dcd 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
@@ -116,4 +116,80 @@ public class BatchWriterReplicationReplayerTest {
     verify(conn, conf, bw);
   }
 
+  @Test
+  public void replicationSourcesArePreserved() throws Exception {
+    final BatchWriterReplicationReplayer replayer = new 
BatchWriterReplicationReplayer();
+    final String tableName = "foo";
+    final long systemTimestamp = 1000;
+    final String peerName = "peer";
+    final BatchWriterConfig bwCfg = new BatchWriterConfig();
+    bwCfg.setMaxMemory(1l);
+
+    Connector conn = createMock(Connector.class);
+    AccumuloConfiguration conf = createMock(AccumuloConfiguration.class);
+    BatchWriter bw = createMock(BatchWriter.class);
+
+    LogFileKey key = new LogFileKey();
+    key.event = LogEvents.MANY_MUTATIONS;
+    key.seq = 1;
+    key.tid = 1;
+
+    WalEdits edits = new WalEdits();
+
+    // Make a mutation without timestamps
+    Mutation m = new Mutation("row");
+    m.put("cf", "cq1", "value");
+    m.put("cf", "cq2", "value");
+    m.put("cf", "cq3", "value");
+    m.put("cf", "cq4", "value");
+    m.put("cf", "cq5", "value");
+
+    // This Mutation "came" from a system called "peer"
+    m.addReplicationSource(peerName);
+
+    // Make it a TMutation
+    TMutation tMutation = m.toThrift();
+
+    // And then make a ServerMutation from the TMutation, adding in our 
systemTimestamp
+    ServerMutation sMutation = new ServerMutation(tMutation);
+    sMutation.setSystemTimestamp(systemTimestamp);
+
+    // Serialize the ServerMutation (what AccumuloReplicaSystem will be doing)
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+
+    key.write(out);
+    out.writeInt(1);
+    sMutation.write(out);
+
+    out.close();
+
+    // Add it to our "input" to BatchWriterReplicationReplayer
+    edits.addToEdits(ByteBuffer.wrap(baos.toByteArray()));
+
+    Mutation expectedMutation = new Mutation("row");
+    expectedMutation.put("cf", "cq1", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq2", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq3", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq4", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq5", sMutation.getSystemTimestamp(), "value");
+
+    // We expect our peer name to be preserved in the mutation that gets 
written
+    expectedMutation.addReplicationSource(peerName);
+
+    
expect(conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY)).andReturn(bwCfg.getMaxMemory());
+    expect(conn.createBatchWriter(tableName, bwCfg)).andReturn(bw);
+
+    bw.addMutations(Lists.newArrayList(expectedMutation));
+    expectLastCall().once();
+
+    bw.close();
+    expectLastCall().once();
+
+    replay(conn, conf, bw);
+
+    replayer.replicateLog(conn, conf, tableName, edits);
+
+    verify(conn, conf, bw);
+  }
 }

Reply via email to