http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
index 0497892..337a592 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
@@ -23,7 +23,7 @@ import java.util.Set;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.thrift.NoServersAvailableException;
-import 
org.apache.accumulo.core.replication.thrift.RemoteReplicationCoordinator;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.thrift.TException;
@@ -33,7 +33,7 @@ import com.google.common.base.Preconditions;
 /**
  * Choose a tserver to service a replication task
  */
-public class MasterReplicationCoordinator implements 
RemoteReplicationCoordinator.Iface {
+public class MasterReplicationCoordinator implements 
ReplicationCoordinator.Iface {
 
   public static enum NoServersAvailable {
     NO_ONLINE_SERVERS

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
index 370fcfd..699381f 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.master.replication;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -38,6 +37,7 @@ import 
org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
@@ -215,8 +215,8 @@ public class ReplicationWorkAssigner implements Runnable {
         if (StatusUtil.isWorkRequired(status)) {
           Path p = new Path(file);
           String filename = p.getName();
-          entry.getKey().getColumnQualifier(buffer);
-          String key = filename + "|" + buffer;
+          WorkSection.getTarget(entry.getKey(), buffer);
+          String key = ReplicationWorkAssignerHelper.getQueueKey(filename, 
buffer.toString());
 
           // And, we haven't already queued this file up for work already
           if (!queuedWork.contains(key)) {
@@ -234,10 +234,10 @@ public class ReplicationWorkAssigner implements Runnable {
   /**
    * Distribute the work for the given path with filename
    * 
+   * @param key
+   *          Unique key to identify this work in the queue
    * @param path
    *          Full path to a file
-   * @param filename
-   *          Filename
    */
   protected void queueWork(String key, String path) {
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 40e9838..cfc756c 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -142,7 +142,7 @@ public class WorkMaker {
         buffer.reset();
 
         // Set up the writable
-        target.setRemoteName(entry.getKey());
+        target.setPeerName(entry.getKey());
         target.setRemoteIdentifier(entry.getValue());
         target.write(buffer);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
index 89a1ddd..16e6757 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
@@ -22,7 +22,6 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -76,7 +75,7 @@ public class ReplicationWorkAssignerTest {
   @Test
   public void workQueuedUsingFileName() throws Exception {
     ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); 
-    Text serializedTarget = ReplicationTarget.toText(target);
+    Text serializedTarget = target.toText();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
     Set<String> queuedWork = new HashSet<>();
@@ -118,7 +117,7 @@ public class ReplicationWorkAssignerTest {
   @Test
   public void createWorkForFilesNeedingIt() throws Exception {
     ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), 
target2 = new ReplicationTarget("cluster1", "table2"); 
-    Text serializedTarget1 = ReplicationTarget.toText(target1), 
serializedTarget2 = ReplicationTarget.toText(target2);
+    Text serializedTarget1 = target1.toText(), serializedTarget2 = 
target2.toText();
 
     MockInstance inst = new MockInstance(test.getMethodName());
     Credentials creds = new Credentials("root", new PasswordToken(""));
@@ -191,7 +190,7 @@ public class ReplicationWorkAssignerTest {
   @Test
   public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
     ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), 
target2 = new ReplicationTarget("cluster1", "table2"); 
-    Text serializedTarget1 = ReplicationTarget.toText(target1), 
serializedTarget2 = ReplicationTarget.toText(target2);
+    Text serializedTarget1 = target1.toText(), serializedTarget2 = 
target2.toText();
 
     MockInstance inst = new MockInstance(test.getMethodName());
     Credentials creds = new Credentials("root", new PasswordToken(""));
@@ -263,7 +262,7 @@ public class ReplicationWorkAssignerTest {
     assigner.setQueuedWork(queuedWork);
 
     ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); 
-    Text serializedTarget = ReplicationTarget.toText(target);
+    Text serializedTarget = target.toText();
 
     queuedWork.add("wal1|" + serializedTarget.toString());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 82d7ae9..c0aa31d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -16,22 +16,146 @@
  */
 package org.apache.accumulo.tserver.replication;
 
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper;
+import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * 
+ * Transmit the given data to a peer
  */
 public class ReplicationProcessor implements Processor {
+  private static final Logger log = 
LoggerFactory.getLogger(ReplicationProcessor.class);
+
+  private Instance inst;
+  private AccumuloConfiguration conf;
+  private VolumeManager fs;
+  private Credentials creds;
+
+  public ReplicationProcessor(Instance inst, AccumuloConfiguration conf, 
VolumeManager fs) {
+    this.conf = conf;
+    this.fs = fs;
+    creds = SystemCredentials.get();
+  }
 
   @Override
   public ReplicationProcessor newProcessor() {
-    return new ReplicationProcessor();
+    return new ReplicationProcessor(inst, conf, fs);
   }
 
   @Override
   public void process(String workID, byte[] data) {
-    // TODO Auto-generated method stub
+    ReplicationTarget target = 
ReplicationWorkAssignerHelper.fromQueueKey(workID).getValue();
+    String file = new String(data);
+
+    // Find the configured replication peer so we know how to replicate to it
+    Map<String,String> configuredPeers = 
conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
+    String peerType = configuredPeers.get(target.getPeerName());
+    if (null == peerType) {
+      String msg = "Cannot process replication for unknown peer: " +  
target.getPeerName();
+      log.warn(msg);
+      throw new IllegalArgumentException(msg);
+    }
+
+    // Get the peer that we're replicating to
+    ReplicaSystem replica = ReplicaSystemFactory.get(peerType);
+    Status status;
+    try {
+      status = getStatus(file, target);
+    } catch (TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
+      log.error("Could not look for necessary replication record", e);
+      throw new IllegalStateException("Could not look for replication record", 
e);
+    } catch (InvalidProtocolBufferException e) {
+      log.error("Could not deserialize Status from Work section for {} and ", 
file, target);
+      throw new RuntimeException("Could not parse Status for work record", e);
+    }
 
+    // We don't need to do anything (shouldn't have gotten this work record in 
the first place)
+    if (!StatusUtil.isWorkRequired(status)) {
+      log.info("Received work request for {} and {}, but it does not need 
replication. Ignoring...", file, target);
+      return;
+    }
+
+    // Sanity check that nothing bad happened and our replication source still 
exists
+    Path filePath = new Path(file);
+    try {
+      if (!fs.exists(filePath)) {
+        log.warn("Received work request for {} and {}, but the file doesn't 
exist", filePath, target);
+        return;
+      }
+    } catch (IOException e) {
+      log.error("Could not determine if file exists {}", filePath, e);
+      throw new RuntimeException(e);
+    }
+
+    // Replicate that sucker
+    Status replicatedStatus = replica.replicate(filePath, status, target);
+
+    // If we got a different status
+    if (!replicatedStatus.equals(status)) {
+      // We actually did some work!
+      recordNewStatus(filePath, replicatedStatus, target);
+    }
+
+    // otherwise, we didn't actually replicate because there was error sending 
the data
+    // we can just not record any updates, and it will be picked up again by 
the work assigner
   }
 
+  public Status getStatus(String file, ReplicationTarget target) throws 
TableNotFoundException, AccumuloException, AccumuloSecurityException, 
InvalidProtocolBufferException {
+    Scanner s = 
ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), 
creds.getToken()));
+    s.setRange(Range.exact(file));
+    s.fetchColumn(WorkSection.NAME, target.toText());
+    
+    return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+  }
+
+  /**
+   * Record the updated Status for this file and target
+   * @param filePath Path to file being replicated
+   * @param status Updated Status after replication
+   * @param target Peer that was replicated to
+   */
+  public void recordNewStatus(Path filePath, Status status, ReplicationTarget 
target) {
+    try {
+      Connector conn = inst.getConnector(creds.getPrincipal(), 
creds.getToken());
+      BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+      Mutation m = new Mutation(filePath.toString());
+      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
+      bw.addMutation(m);
+      bw.close();
+    } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
+      log.error("Error recording updated Status for {}", filePath, e);
+      throw new RuntimeException(e);
+    }
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index 627e51a..e00f343 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -49,7 +49,7 @@ public class ReplicationWorker implements Runnable {
   @Override
   public void run() {
     try {
-      new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION, 
conf).startProcessing(new ReplicationProcessor(), executor);
+      new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION, 
conf).startProcessing(new ReplicationProcessor(inst, conf, fs), executor);
     } catch (KeeperException | InterruptedException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 33da8a0..92f6fb3 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -385,7 +385,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
         s = ReplicationTable.getScanner(conn);
         WorkSection.limit(s);
         Entry<Key,Value> e = Iterables.getOnlyElement(s);
-        Text expectedColqual = ReplicationTarget.toText(new 
ReplicationTarget("cluster1", "4"));
+        Text expectedColqual = new ReplicationTarget("cluster1", "4").toText();
         Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
         notFound = false;
       } catch (NoSuchElementException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
index 613a01b..defff65 100644
--- 
a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
@@ -303,7 +303,7 @@ public class ReplicationWithMakerTest extends 
ConfigurableMacIT {
       WorkSection.limit(s);
       try {
         Entry<Key,Value> e = Iterables.getOnlyElement(s);
-        Text expectedColqual = ReplicationTarget.toText(new 
ReplicationTarget("cluster1", "4"));
+        Text expectedColqual = new ReplicationTarget("cluster1", "4").toText();
         Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
         notFound = false;
       } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) 
{

Reply via email to