ACCUMULO-2581 Add tests for work assignment using the distributed work queue


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cc9a72ac
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cc9a72ac
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cc9a72ac

Branch: refs/heads/ACCUMULO-378
Commit: cc9a72ac736c50fa798a821fcd603fc033994eff
Parents: 31f4e83
Author: Josh Elser <els...@apache.org>
Authored: Thu May 1 21:16:08 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Thu May 1 21:16:08 2014 -0400

----------------------------------------------------------------------
 .../accumulo/core/replication/StatusUtil.java   |  2 +-
 .../server/zookeeper/DistributedWorkQueue.java  | 12 +++
 .../replication/ReplicationWorkAssigner.java    | 17 ++--
 .../ReplicationWorkAssignerTest.java            | 97 ++++++++++++++++++--
 4 files changed, 112 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java 
b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
index 74327fd..842d945 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
@@ -200,7 +200,7 @@ public class StatusUtil {
    */
   public static boolean isWorkRequired(Status status) {
     if (status.getInfiniteEnd()) {
-      return Long.MAX_VALUE == status.getBegin();
+      return Long.MAX_VALUE != status.getBegin();
     } else {
       return status.getBegin() < status.getEnd();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index 4738c2b..b1907fb 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.zookeeper;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -205,6 +206,17 @@ public class DistributedWorkQueue {
       }
     }, r.nextInt(60 * 1000), 60 * 1000);
   }
+
+  /**
+   * Adds work to the queue, automatically converting the String to bytes 
using {@link StandardCharsets#UTF_8}
+   * @param workId
+   * @param data
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void addWork(String workId, String data) throws KeeperException, 
InterruptedException {
+    addWork(workId, data.getBytes(StandardCharsets.UTF_8));
+  }
   
   public void addWork(String workId, byte[] data) throws KeeperException, 
InterruptedException {
     if (workId.equalsIgnoreCase(LOCKS_NODE))

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
index f3a8825..370fcfd 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -215,11 +215,12 @@ public class ReplicationWorkAssigner implements Runnable {
         if (StatusUtil.isWorkRequired(status)) {
           Path p = new Path(file);
           String filename = p.getName();
+          entry.getKey().getColumnQualifier(buffer);
+          String key = filename + "|" + buffer;
 
           // And, we haven't already queued this file up for work already
-          if (!queuedWork.contains(filename)) {
-            entry.getKey().getColumnQualifier(buffer);
-            queueWork(file, filename, buffer);
+          if (!queuedWork.contains(key)) {
+            queueWork(key, file);
           }
         }
       }
@@ -238,10 +239,9 @@ public class ReplicationWorkAssigner implements Runnable {
    * @param filename
    *          Filename
    */
-  protected void queueWork(String path, String filename, Text 
serializedTarget) {
+  protected void queueWork(String key, String path) {
     try {
-      String key = filename + "|" + serializedTarget;
-      workQueue.addWork(key, path.getBytes(StandardCharsets.UTF_8));
+      workQueue.addWork(key, path);
       queuedWork.add(key);
     } catch (KeeperException | InterruptedException e) {
       log.warn("Could not queue work for {}", path, e);
@@ -252,11 +252,12 @@ public class ReplicationWorkAssigner implements Runnable {
    * Iterate over the queued work to remove entries that have been completed.
    */
   protected void cleanupFinishedWork() {
-    Iterator<String> work = queuedWork.iterator();
+    final Iterator<String> work = queuedWork.iterator();
+    final String instanceId = master.getInstance().getInstanceID();
     while (work.hasNext()) {
       String filename = work.next();
       // Null equates to the work was finished
-      if (null == zooCache.get(ZooUtil.getRoot(master.getInstance()) + 
Constants.ZREPLICATION + "/" + filename)) {
+      if (null == zooCache.get(ZooUtil.getRoot(instanceId) + 
Constants.ZREPLICATION + "/" + filename)) {
         work.remove();
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc9a72ac/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
index b596126..89a1ddd 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
@@ -20,16 +20,20 @@ import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Mutation;
@@ -41,6 +45,7 @@ import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -79,13 +84,14 @@ public class ReplicationWorkAssignerTest {
     assigner.setWorkQueue(workQueue);
 
     Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID());
-    assigner.queueWork(p.toString(), p.getName(), serializedTarget);
     
-    workQueue.addWork(p.getName(), 
p.toString().getBytes(StandardCharsets.UTF_8));
+    workQueue.addWork(p.getName() + "|" + serializedTarget.toString(), 
p.toString());
     expectLastCall().once();
 
     replay(workQueue);
 
+    assigner.queueWork(p.getName() + "|" + serializedTarget, p.toString());
+
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertEquals(p.getName() + "|" + serializedTarget, 
queuedWork.iterator().next());
   }
@@ -102,6 +108,8 @@ public class ReplicationWorkAssignerTest {
     assigner.setWorkQueue(workQueue);
     assigner.initializeQueuedWork();
 
+    verify(workQueue);
+
     Set<String> queuedWork = assigner.getQueuedWork();
     Assert.assertEquals("Expected existing work and queued work to be the same 
size", existingWork.size(), queuedWork.size());
     Assert.assertTrue("Expected all existing work to be queued", 
queuedWork.containsAll(existingWork));
@@ -141,28 +149,34 @@ public class ReplicationWorkAssignerTest {
     @SuppressWarnings("unchecked")
     HashSet<String> queuedWork = createMock(HashSet.class);
     assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
     expect(queuedWork.size()).andReturn(0).anyTimes();
 
     // Make sure we expect the invocations in the correct order (accumulo is 
sorted)
     if (file1.compareTo(file2) <= 0) {
       String key = filename1 + "|" + serializedTarget1;
-      workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8));
+      expect(queuedWork.contains(key)).andReturn(false);
+      workQueue.addWork(key, file1);
       expectLastCall().once();
       expect(queuedWork.add(key)).andReturn(true).once();
       
       key = filename2 + "|" + serializedTarget2;
-      workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8));
+      expect(queuedWork.contains(key)).andReturn(false);
+      workQueue.addWork(key, file2);
       expectLastCall().once();
       expect(queuedWork.add(key)).andReturn(true).once();
     } else {
       String key = filename2 + "|" + serializedTarget2;
-      workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8));
+      expect(queuedWork.contains(key)).andReturn(false);
+      workQueue.addWork(key, file2);
       expectLastCall().once();
       expect(queuedWork.add(key)).andReturn(true).once();
 
       key = filename1 + "|" + serializedTarget1;
-      workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8));
+      expect(queuedWork.contains(key)).andReturn(false);
+      workQueue.addWork(key, file1);
       expectLastCall().once();
       expect(queuedWork.add(key)).andReturn(true).once();
     }
@@ -170,6 +184,8 @@ public class ReplicationWorkAssignerTest {
     replay(queuedWork, workQueue);
 
     assigner.createWork();
+
+    verify(queuedWork, workQueue);
   }
 
   @Test
@@ -192,6 +208,7 @@ public class ReplicationWorkAssignerTest {
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     String filename1 = UUID.randomUUID().toString(), filename2 = 
UUID.randomUUID().toString();
     String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = 
"/accumulo/wal/tserver+port/" + filename2;
+
     Mutation m = new Mutation(file1);
     WorkSection.add(m, serializedTarget1, StatusUtil.newFileValue());
     bw.addMutation(m);
@@ -206,12 +223,78 @@ public class ReplicationWorkAssignerTest {
     @SuppressWarnings("unchecked")
     HashSet<String> queuedWork = createMock(HashSet.class);
     assigner.setQueuedWork(queuedWork);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
 
-    expect(queuedWork.size()).andReturn(0).anyTimes();
+    expect(queuedWork.size()).andReturn(0).times(2);
 
     replay(queuedWork, workQueue);
 
     assigner.createWork();
+    
+    verify(queuedWork, workQueue);
+  }
+
+  @Test
+  public void workNotInZooKeeperIsCleanedUp() {
+    Set<String> queuedWork = new LinkedHashSet<>(Arrays.asList("wal1", 
"wal2"));
+    assigner.setQueuedWork(queuedWork);
+
+    Instance inst = createMock(Instance.class);
+    ZooCache cache = createMock(ZooCache.class);
+    assigner.setZooCache(cache);
+
+    expect(master.getInstance()).andReturn(inst);
+    expect(inst.getInstanceID()).andReturn("id");
+    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION + 
"/wal1")).andReturn(null);
+    expect(cache.get(Constants.ZROOT + "/id" + Constants.ZREPLICATION + 
"/wal2")).andReturn(null);
+
+    replay(cache, inst, master);
+
+    assigner.cleanupFinishedWork();
+
+    verify(cache, inst, master);
+    Assert.assertTrue("Queued work was not emptied", queuedWork.isEmpty());
   }
 
+  @Test
+  public void workNotReAdded() throws Exception  {
+    Set<String> queuedWork = new HashSet<>();
+
+    assigner.setQueuedWork(queuedWork);
+
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); 
+    Text serializedTarget = ReplicationTarget.toText(target);
+
+    queuedWork.add("wal1|" + serializedTarget.toString());
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String file1 = "/accumulo/wal/tserver+port/wal1";
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget, 
StatusUtil.openWithUnknownLengthValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    replay(workQueue);
+    
+    assigner.createWork();
+
+    verify(workQueue);
+  }
 }

Reply via email to