ACCUMULO-2581 Add tests for work assignment using the distributed work queue
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cc9a72ac Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cc9a72ac Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cc9a72ac Branch: refs/heads/ACCUMULO-378 Commit: cc9a72ac736c50fa798a821fcd603fc033994eff Parents: 31f4e83 Author: Josh Elser <els...@apache.org> Authored: Thu May 1 21:16:08 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 1 21:16:08 2014 -0400 ---------------------------------------------------------------------- .../accumulo/core/replication/StatusUtil.java | 2 +- .../server/zookeeper/DistributedWorkQueue.java | 12 +++ .../replication/ReplicationWorkAssigner.java | 17 ++-- .../ReplicationWorkAssignerTest.java | 97 ++++++++++++++++++-- 4 files changed, 112 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java index 74327fd..842d945 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java @@ -200,7 +200,7 @@ public class StatusUtil { */ public static boolean isWorkRequired(Status status) { if (status.getInfiniteEnd()) { - return Long.MAX_VALUE == status.getBegin(); + return Long.MAX_VALUE != status.getBegin(); } else { return status.getBegin() < status.getEnd(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java index 4738c2b..b1907fb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.server.zookeeper; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -205,6 +206,17 @@ public class DistributedWorkQueue { } }, r.nextInt(60 * 1000), 60 * 1000); } + + /** + * Adds work to the queue, automatically converting the String to bytes using {@link StandardCharsets#UTF_8} + * @param workId + * @param data + * @throws KeeperException + * @throws InterruptedException + */ + public void addWork(String workId, String data) throws KeeperException, InterruptedException { + addWork(workId, data.getBytes(StandardCharsets.UTF_8)); + } public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException { if (workId.equalsIgnoreCase(LOCKS_NODE)) http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java index f3a8825..370fcfd 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java @@ -215,11 +215,12 @@ public class ReplicationWorkAssigner implements Runnable { if (StatusUtil.isWorkRequired(status)) { Path p = new Path(file); String filename = p.getName(); + entry.getKey().getColumnQualifier(buffer); + String key = filename + "|" + buffer; // And, we haven't already queued this file up for work already - if (!queuedWork.contains(filename)) { - entry.getKey().getColumnQualifier(buffer); - queueWork(file, filename, buffer); + if (!queuedWork.contains(key)) { + queueWork(key, file); } } } @@ -238,10 +239,9 @@ public class ReplicationWorkAssigner implements Runnable { * @param filename * Filename */ - protected void queueWork(String path, String filename, Text serializedTarget) { + protected void queueWork(String key, String path) { try { - String key = filename + "|" + serializedTarget; - workQueue.addWork(key, path.getBytes(StandardCharsets.UTF_8)); + workQueue.addWork(key, path); queuedWork.add(key); } catch (KeeperException | InterruptedException e) { log.warn("Could not queue work for {}", path, e); @@ -252,11 +252,12 @@ public class ReplicationWorkAssigner implements Runnable { * Iterate over the queued work to remove entries that have been completed. */ protected void cleanupFinishedWork() { - Iterator<String> work = queuedWork.iterator(); + final Iterator<String> work = queuedWork.iterator(); + final String instanceId = master.getInstance().getInstanceID(); while (work.hasNext()) { String filename = work.next(); // Null equates to the work was finished - if (null == zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION + "/" + filename)) { + if (null == zooCache.get(ZooUtil.getRoot(instanceId) + Constants.ZREPLICATION + "/" + filename)) { work.remove(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java index b596126..89a1ddd 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java @@ -20,16 +20,20 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; @@ -41,6 +45,7 @@ import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.Assert; @@ -79,13 +84,14 @@ public class ReplicationWorkAssignerTest { assigner.setWorkQueue(workQueue); Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID()); - assigner.queueWork(p.toString(), p.getName(), serializedTarget); - workQueue.addWork(p.getName(), p.toString().getBytes(StandardCharsets.UTF_8)); + workQueue.addWork(p.getName() + "|" + serializedTarget.toString(), p.toString()); expectLastCall().once(); replay(workQueue); + assigner.queueWork(p.getName() + "|" + serializedTarget, p.toString()); + Assert.assertEquals(1, queuedWork.size()); Assert.assertEquals(p.getName() + "|" + serializedTarget, queuedWork.iterator().next()); } @@ -102,6 +108,8 @@ public class ReplicationWorkAssignerTest { assigner.setWorkQueue(workQueue); assigner.initializeQueuedWork(); + verify(workQueue); + Set<String> queuedWork = assigner.getQueuedWork(); Assert.assertEquals("Expected existing work and queued work to be the same size", existingWork.size(), queuedWork.size()); Assert.assertTrue("Expected all existing work to be queued", queuedWork.containsAll(existingWork)); @@ -141,28 +149,34 @@ public class ReplicationWorkAssignerTest { @SuppressWarnings("unchecked") HashSet<String> queuedWork = createMock(HashSet.class); assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); expect(queuedWork.size()).andReturn(0).anyTimes(); // Make sure we expect the invocations in the correct order (accumulo is sorted) if (file1.compareTo(file2) <= 0) { String key = filename1 + "|" + serializedTarget1; - workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8)); + expect(queuedWork.contains(key)).andReturn(false); + workQueue.addWork(key, file1); expectLastCall().once(); expect(queuedWork.add(key)).andReturn(true).once(); key = filename2 + "|" + serializedTarget2; - workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8)); + expect(queuedWork.contains(key)).andReturn(false); + workQueue.addWork(key, file2); expectLastCall().once(); expect(queuedWork.add(key)).andReturn(true).once(); } else { String key = filename2 + "|" + serializedTarget2; - workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8)); + expect(queuedWork.contains(key)).andReturn(false); + workQueue.addWork(key, file2); expectLastCall().once(); expect(queuedWork.add(key)).andReturn(true).once(); key = filename1 + "|" + serializedTarget1; - workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8)); + expect(queuedWork.contains(key)).andReturn(false); + workQueue.addWork(key, file1); expectLastCall().once(); expect(queuedWork.add(key)).andReturn(true).once(); } @@ -170,6 +184,8 @@ public class ReplicationWorkAssignerTest { replay(queuedWork, workQueue); assigner.createWork(); + + verify(queuedWork, workQueue); } @Test @@ -192,6 +208,7 @@ public class ReplicationWorkAssignerTest { BatchWriter bw = ReplicationTable.getBatchWriter(conn); String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString(); String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + Mutation m = new Mutation(file1); WorkSection.add(m, serializedTarget1, StatusUtil.newFileValue()); bw.addMutation(m); @@ -206,12 +223,78 @@ public class ReplicationWorkAssignerTest { @SuppressWarnings("unchecked") HashSet<String> queuedWork = createMock(HashSet.class); assigner.setQueuedWork(queuedWork); + assigner.setMaxQueueSize(Integer.MAX_VALUE); - expect(queuedWork.size()).andReturn(0).anyTimes(); + expect(queuedWork.size()).andReturn(0).times(2); replay(queuedWork, workQueue); assigner.createWork(); + + verify(queuedWork, workQueue); + } + + @Test + public void workNotInZooKeeperIsCleanedUp() { + Set<String> queuedWork = new LinkedHashSet<>(Arrays.asList("wal1", "wal2")); + assigner.setQueuedWork(queuedWork); + + Instance inst = createMock(Instance.class); + ZooCache cache = createMock(ZooCache.class); + assigner.setZooCache(cache); + + expect(master.getInstance()).andReturn(inst); + expect(inst.getInstanceID()).andReturn("id"); + expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION + "/wal1")).andReturn(null); + expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION + "/wal2")).andReturn(null); + + replay(cache, inst, master); + + assigner.cleanupFinishedWork(); + + verify(cache, inst, master); + Assert.assertTrue("Queued work was not emptied", queuedWork.isEmpty()); } + @Test + public void workNotReAdded() throws Exception { + Set<String> queuedWork = new HashSet<>(); + + assigner.setQueuedWork(queuedWork); + + ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); + Text serializedTarget = ReplicationTarget.toText(target); + + queuedWork.add("wal1|" + serializedTarget.toString()); + + MockInstance inst = new MockInstance(test.getMethodName()); + Credentials creds = new Credentials("root", new PasswordToken("")); + Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + + // Set the connector + assigner.setConnector(conn); + + // Create and grant ourselves write to the replication table + ReplicationTable.create(conn); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + String file1 = "/accumulo/wal/tserver+port/wal1"; + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget, StatusUtil.openWithUnknownLengthValue()); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + assigner.setWorkQueue(workQueue); + assigner.setMaxQueueSize(Integer.MAX_VALUE); + + replay(workQueue); + + assigner.createWork(); + + verify(workQueue); + } }