ACCUMULO-2581 Initial impl to use DistributedWorkQueue to assign replication work.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/31f4e83b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/31f4e83b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/31f4e83b Branch: refs/heads/ACCUMULO-378 Commit: 31f4e83b94c4f150df2639f251d6835305fa531d Parents: f584685 Author: Josh Elser <els...@apache.org> Authored: Thu May 1 16:58:47 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 1 16:58:47 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/Constants.java | 2 + .../org/apache/accumulo/core/conf/Property.java | 6 +- .../accumulo/core/replication/StatusUtil.java | 24 ++ server/master/pom.xml | 5 + .../replication/ReplicationWorkAssigner.java | 264 +++++++++++++++++++ .../accumulo/master/replication/Work.java | 98 +++++++ .../ReplicationWorkAssignerTest.java | 217 +++++++++++++++ .../accumulo/master/replication/WorkTest.java | 49 ++++ 8 files changed, 664 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/Constants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 7d602bb..c87119c 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -113,4 +113,6 @@ public class Constants { public static final String[] PATH_PROPERTY_ENV_VARS = new String[] {"ACCUMULO_HOME", "ACCUMULO_CONF_DIR"}; public static final String HDFS_TABLES_DIR = "/tables"; + + public static final String ZREPLICATION = "/replication"; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 2ad1369..637600d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -454,7 +454,11 @@ public enum Property { REPLICATION_BATCH_SIZE("replication.batch.size", "1000", PropertyType.COUNT, "Maximum number of updates (WAL) or key-value pairs (RFile) to send in one replication task"), @Experimental REPLICATION_SEND_THREAD_POOL_SIZE("replication.send.threads", "1", PropertyType.COUNT, "Size of threadpool used to start replication to slaves"), - + @Experimental + REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "20000000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"), + @Experimental + REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"), + ; private String key, defaultValue, description; http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java index 3b8ccb6..74327fd 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java @@ -21,6 +21,8 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.replication.proto.Replication.Status.Builder; +import com.google.protobuf.InvalidProtocolBufferException; + /** * Helper methods to create Status protobuf messages */ @@ -161,6 +163,15 @@ public class StatusUtil { } /** + * @param v Value with serialized Status + * @return A Status created from the Value + * @throws InvalidProtocolBufferException + */ + public static Status fromValue(Value v) throws InvalidProtocolBufferException { + return Status.parseFrom(v.get()); + } + + /** * Is the given Status fully replicated and is its file ready for deletion on the source * @param status a Status protobuf * @return True if the file this Status references can be deleted. @@ -181,4 +192,17 @@ public class StatusUtil { return status.getBegin() >= status.getEnd(); } } + + /** + * Given the {@link Status}, is there replication work to be done + * @param status Status for a file + * @return true if replication work is required + */ + public static boolean isWorkRequired(Status status) { + if (status.getInfiniteEnd()) { + return Long.MAX_VALUE == status.getBegin(); + } else { + return status.getBegin() < status.getEnd(); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/pom.xml ---------------------------------------------------------------------- diff --git a/server/master/pom.xml b/server/master/pom.xml index f9b2990..3b9684c 100644 --- a/server/master/pom.xml +++ b/server/master/pom.xml @@ -84,5 +84,10 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java new file mode 100644 index 0000000..f3a8825 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.replication; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Read work records from the replication table, create work entries for other nodes to complete. + * <p> + * Uses the DistributedWorkQueue to make the work available for any tserver. This approach does not consider the locality of the tabletserver performing the + * work in relation to the data being replicated (local HDFS blocks). + */ +public class ReplicationWorkAssigner implements Runnable { + private static final Logger log = LoggerFactory.getLogger(ReplicationWorkAssigner.class); + + private Master master; + private Connector conn; + + private AccumuloConfiguration conf; + private DistributedWorkQueue workQueue; + private Set<String> queuedWork; + private int maxQueueSize; + private ZooCache zooCache; + + public ReplicationWorkAssigner(Master master, Connector conn) { + this.master = master; + this.conn = conn; + } + + /* + * Getters/setters for testing purposes + */ + protected Connector getConnector() { + return conn; + } + + protected void setConnector(Connector conn) { + this.conn = conn; + } + + protected AccumuloConfiguration getConf() { + return conf; + } + + protected void setConf(AccumuloConfiguration conf) { + this.conf = conf; + } + + protected DistributedWorkQueue getWorkQueue() { + return workQueue; + } + + protected void setWorkQueue(DistributedWorkQueue workQueue) { + this.workQueue = workQueue; + } + + protected Set<String> getQueuedWork() { + return queuedWork; + } + + protected void setQueuedWork(Set<String> queuedWork) { + this.queuedWork = queuedWork; + } + + protected int getMaxQueueSize() { + return maxQueueSize; + } + + protected void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + protected ZooCache getZooCache() { + return zooCache; + } + + protected void setZooCache(ZooCache zooCache) { + this.zooCache = zooCache; + } + + /** + * Initialize the DistributedWorkQueue using the proper ZK location + * + * @param conf + */ + protected void initializeWorkQueue(AccumuloConfiguration conf) { + workQueue = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION, conf); + } + + /** + * Initialize the queuedWork set with the work already sent out + */ + protected void initializeQueuedWork() { + Preconditions.checkArgument(null == queuedWork, "Expected queuedWork to be null"); + queuedWork = new HashSet<>(); + try { + queuedWork.addAll(workQueue.getWorkQueued()); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Error reading existing queued replication work", e); + } + } + + @Override + public void run() { + while (master.stillMaster()) { + if (null == conf) { + conf = master.getConfiguration().getConfiguration(); + } + + // Get the maximum number of entries we want to queue work for (or the default) + maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE); + + if (null == workQueue) { + initializeWorkQueue(conf); + } + + if (null == queuedWork) { + initializeQueuedWork(); + } + + if (null == zooCache) { + zooCache = new ZooCache(); + } + + // Scan over the work records, adding the work to the queue + createWork(); + + // Keep the state of the work we queued correct + cleanupFinishedWork(); + } + } + + /** + * Scan over the {@link WorkSection} of the replication table adding work for entries that + * have data to replicate and have not already been queued. + */ + protected void createWork() { + // Create a batchscanner over the replication table's work entries + BatchScanner bs; + try { + bs = ReplicationTable.getBatchScanner(conn, 4); + } catch (TableNotFoundException e) { + log.warn("Could not find replication table", e); + return; + } + + WorkSection.limit(bs); + bs.setRanges(Collections.singleton(new Range())); + Text buffer = new Text(); + try { + for (Entry<Key,Value> entry : bs) { + // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing + // to add more work entries + if (queuedWork.size() > maxQueueSize) { + log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize); + return; + } + + WorkSection.getFile(entry.getKey(), buffer); + String file = buffer.toString(); + Status status; + try { + status = StatusUtil.fromValue(entry.getValue()); + } catch (InvalidProtocolBufferException e) { + log.warn("Could not deserialize protobuf from work entry for {}", file, e); + continue; + } + + // If there is work to do + if (StatusUtil.isWorkRequired(status)) { + Path p = new Path(file); + String filename = p.getName(); + + // And, we haven't already queued this file up for work already + if (!queuedWork.contains(filename)) { + entry.getKey().getColumnQualifier(buffer); + queueWork(file, filename, buffer); + } + } + } + } finally { + if (null != bs) { + bs.close(); + } + } + } + + /** + * Distribute the work for the given path with filename + * + * @param path + * Full path to a file + * @param filename + * Filename + */ + protected void queueWork(String path, String filename, Text serializedTarget) { + try { + String key = filename + "|" + serializedTarget; + workQueue.addWork(key, path.getBytes(StandardCharsets.UTF_8)); + queuedWork.add(key); + } catch (KeeperException | InterruptedException e) { + log.warn("Could not queue work for {}", path, e); + } + } + + /** + * Iterate over the queued work to remove entries that have been completed. + */ + protected void cleanupFinishedWork() { + Iterator<String> work = queuedWork.iterator(); + while (work.hasNext()) { + String filename = work.next(); + // Null equates to the work was finished + if (null == zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION + "/" + filename)) { + work.remove(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java new file mode 100644 index 0000000..cb45b44 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.replication; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import com.google.protobuf.TextFormat; + +/** + * Encapsulates a file (path) and {@link Status} + */ +public class Work implements Writable { + + private String file; + + private Status status; + + public Work() { } + + public Work(String file, Status status) { + this.file = file; + this.status = status; + } + + public String getFile() { + return file; + } + + public void setFile(String file) { + this.file = file; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeString(out, file); + byte[] bytes = status.toByteArray(); + WritableUtils.writeVInt(out, bytes.length); + out.write(bytes); + } + + @Override + public void readFields(DataInput in) throws IOException { + file = WritableUtils.readString(in); + int len = WritableUtils.readVInt(in); + byte[] bytes = new byte[len]; + in.readFully(bytes); + status = Status.parseFrom(bytes); + } + + @Override + public boolean equals(Object o) { + if (o instanceof Work) { + Work other = (Work) o; + return file.equals(other.getFile()) && status.equals(other.getStatus()); + } + + return false; + } + + @Override + public String toString() { + return file + " " + TextFormat.shortDebugString(status); + } + + @Override + public int hashCode() { + return file.hashCode() ^ status.hashCode(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java new file mode 100644 index 0000000..b596126 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.replication; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * + */ +public class ReplicationWorkAssignerTest { + + @Rule + public TestName test = new TestName(); + + private Master master; + private Connector conn; + private ReplicationWorkAssigner assigner; + + @Before + public void init() { + master = createMock(Master.class); + conn = createMock(Connector.class); + assigner = new ReplicationWorkAssigner(master, conn); + } + + @Test + public void workQueuedUsingFileName() throws Exception { + ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); + Text serializedTarget = ReplicationTarget.toText(target); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + Set<String> queuedWork = new HashSet<>(); + assigner.setQueuedWork(queuedWork); + assigner.setWorkQueue(workQueue); + + Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID()); + assigner.queueWork(p.toString(), p.getName(), serializedTarget); + + workQueue.addWork(p.getName(), p.toString().getBytes(StandardCharsets.UTF_8)); + expectLastCall().once(); + + replay(workQueue); + + Assert.assertEquals(1, queuedWork.size()); + Assert.assertEquals(p.getName() + "|" + serializedTarget, queuedWork.iterator().next()); + } + + @Test + public void existingWorkIsReQueued() throws Exception { + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + + List<String> existingWork = Arrays.asList("/accumulo/wal/tserver+port/wal1", "/accumulo/wal/tserver+port/wal2"); + expect(workQueue.getWorkQueued()).andReturn(existingWork); + + replay(workQueue); + + assigner.setWorkQueue(workQueue); + assigner.initializeQueuedWork(); + + Set<String> queuedWork = assigner.getQueuedWork(); + Assert.assertEquals("Expected existing work and queued work to be the same size", existingWork.size(), queuedWork.size()); + Assert.assertTrue("Expected all existing work to be queued", queuedWork.containsAll(existingWork)); + } + + @Test + public void createWorkForFilesNeedingIt() throws Exception { + ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2"); + Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2); + + MockInstance inst = new MockInstance(test.getMethodName()); + Credentials creds = new Credentials("root", new PasswordToken("")); + Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + + // Set the connector + assigner.setConnector(conn); + + // Create and grant ourselves write to the replication table + ReplicationTable.create(conn); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString(); + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget1, StatusUtil.openWithUnknownLengthValue()); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget2, StatusUtil.openWithUnknownLengthValue()); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + @SuppressWarnings("unchecked") + HashSet<String> queuedWork = createMock(HashSet.class); + assigner.setQueuedWork(queuedWork); + + expect(queuedWork.size()).andReturn(0).anyTimes(); + + // Make sure we expect the invocations in the correct order (accumulo is sorted) + if (file1.compareTo(file2) <= 0) { + String key = filename1 + "|" + serializedTarget1; + workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8)); + expectLastCall().once(); + expect(queuedWork.add(key)).andReturn(true).once(); + + key = filename2 + "|" + serializedTarget2; + workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8)); + expectLastCall().once(); + expect(queuedWork.add(key)).andReturn(true).once(); + } else { + String key = filename2 + "|" + serializedTarget2; + workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8)); + expectLastCall().once(); + expect(queuedWork.add(key)).andReturn(true).once(); + + key = filename1 + "|" + serializedTarget1; + workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8)); + expectLastCall().once(); + expect(queuedWork.add(key)).andReturn(true).once(); + } + + replay(queuedWork, workQueue); + + assigner.createWork(); + } + + @Test + public void doNotCreateWorkForFilesNotNeedingIt() throws Exception { + ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2"); + Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2); + + MockInstance inst = new MockInstance(test.getMethodName()); + Credentials creds = new Credentials("root", new PasswordToken("")); + Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + + // Set the connector + assigner.setConnector(conn); + + // Create and grant ourselves write to the replication table + ReplicationTable.create(conn); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + + // Create two mutations, both of which need replication work done + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString(); + String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2; + Mutation m = new Mutation(file1); + WorkSection.add(m, serializedTarget1, StatusUtil.newFileValue()); + bw.addMutation(m); + + m = new Mutation(file2); + WorkSection.add(m, serializedTarget2, StatusUtil.newFileValue()); + bw.addMutation(m); + + bw.close(); + + DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); + @SuppressWarnings("unchecked") + HashSet<String> queuedWork = createMock(HashSet.class); + assigner.setQueuedWork(queuedWork); + + expect(queuedWork.size()).andReturn(0).anyTimes(); + + replay(queuedWork, workQueue); + + assigner.createWork(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java new file mode 100644 index 0000000..09530a7 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.replication; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.accumulo.core.replication.StatusUtil; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class WorkTest { + + @Test + public void serialization() throws IOException { + Work w = new Work("/foo/bar", StatusUtil.openWithUnknownLength()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + w.write(new DataOutputStream(baos)); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + + Work newW = new Work(); + newW.readFields(new DataInputStream(bais)); + + Assert.assertEquals(w, newW); + } + +}