http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java index 0497892..337a592 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java @@ -23,7 +23,7 @@ import java.util.Set; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.replication.thrift.NoServersAvailableException; -import org.apache.accumulo.core.replication.thrift.RemoteReplicationCoordinator; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.thrift.TException; @@ -33,7 +33,7 @@ import com.google.common.base.Preconditions; /** * Choose a tserver to service a replication task */ -public class MasterReplicationCoordinator implements RemoteReplicationCoordinator.Iface { +public class MasterReplicationCoordinator implements ReplicationCoordinator.Iface { public static enum NoServersAvailable { NO_ONLINE_SERVERS
http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java index 370fcfd..699381f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.master.replication; -import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -38,6 +37,7 @@ import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.fs.Path; @@ -215,8 +215,8 @@ public class ReplicationWorkAssigner implements Runnable { if (StatusUtil.isWorkRequired(status)) { Path p = new Path(file); String filename = p.getName(); - entry.getKey().getColumnQualifier(buffer); - String key = filename + "|" + buffer; + WorkSection.getTarget(entry.getKey(), buffer); + String key = ReplicationWorkAssignerHelper.getQueueKey(filename, buffer.toString()); // And, we haven't already queued this file up for work already if (!queuedWork.contains(key)) { @@ -234,10 +234,10 @@ public class ReplicationWorkAssigner implements Runnable { /** * Distribute the work for the given path with filename * + * @param key + * Unique key to identify this work in the queue * @param path * Full path to a file - * @param filename - * Filename */ protected void queueWork(String key, String path) { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java index 40e9838..cfc756c 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java @@ -142,7 +142,7 @@ public class WorkMaker { buffer.reset(); // Set up the writable - target.setRemoteName(entry.getKey()); + target.setPeerName(entry.getKey()); target.setRemoteIdentifier(entry.getValue()); target.write(buffer); http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java index 89a1ddd..16e6757 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java @@ -22,7 +22,6 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashSet; @@ -76,7 +75,7 @@ public class ReplicationWorkAssignerTest { @Test public void workQueuedUsingFileName() throws Exception { ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); - Text serializedTarget = ReplicationTarget.toText(target); + Text serializedTarget = target.toText(); DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); Set<String> queuedWork = new HashSet<>(); @@ -118,7 +117,7 @@ public class ReplicationWorkAssignerTest { @Test public void createWorkForFilesNeedingIt() throws Exception { ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2"); - Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2); + Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText(); MockInstance inst = new MockInstance(test.getMethodName()); Credentials creds = new Credentials("root", new PasswordToken("")); @@ -191,7 +190,7 @@ public class ReplicationWorkAssignerTest { @Test public void doNotCreateWorkForFilesNotNeedingIt() throws Exception { ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2"); - Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2); + Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText(); MockInstance inst = new MockInstance(test.getMethodName()); Credentials creds = new Credentials("root", new PasswordToken("")); @@ -263,7 +262,7 @@ public class ReplicationWorkAssignerTest { assigner.setQueuedWork(queuedWork); ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); - Text serializedTarget = ReplicationTarget.toText(target); + Text serializedTarget = target.toText(); queuedWork.add("wal1|" + serializedTarget.toString()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java index 82d7ae9..c0aa31d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java @@ -16,22 +16,146 @@ */ package org.apache.accumulo.tserver.replication; +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.replication.ReplicaSystem; +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationWorkAssignerHelper; +import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; +import com.google.protobuf.InvalidProtocolBufferException; /** - * + * Transmit the given data to a peer */ public class ReplicationProcessor implements Processor { + private static final Logger log = LoggerFactory.getLogger(ReplicationProcessor.class); + + private Instance inst; + private AccumuloConfiguration conf; + private VolumeManager fs; + private Credentials creds; + + public ReplicationProcessor(Instance inst, AccumuloConfiguration conf, VolumeManager fs) { + this.conf = conf; + this.fs = fs; + creds = SystemCredentials.get(); + } @Override public ReplicationProcessor newProcessor() { - return new ReplicationProcessor(); + return new ReplicationProcessor(inst, conf, fs); } @Override public void process(String workID, byte[] data) { - // TODO Auto-generated method stub + ReplicationTarget target = ReplicationWorkAssignerHelper.fromQueueKey(workID).getValue(); + String file = new String(data); + + // Find the configured replication peer so we know how to replicate to it + Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS); + String peerType = configuredPeers.get(target.getPeerName()); + if (null == peerType) { + String msg = "Cannot process replication for unknown peer: " + target.getPeerName(); + log.warn(msg); + throw new IllegalArgumentException(msg); + } + + // Get the peer that we're replicating to + ReplicaSystem replica = ReplicaSystemFactory.get(peerType); + Status status; + try { + status = getStatus(file, target); + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + log.error("Could not look for necessary replication record", e); + throw new IllegalStateException("Could not look for replication record", e); + } catch (InvalidProtocolBufferException e) { + log.error("Could not deserialize Status from Work section for {} and ", file, target); + throw new RuntimeException("Could not parse Status for work record", e); + } + // We don't need to do anything (shouldn't have gotten this work record in the first place) + if (!StatusUtil.isWorkRequired(status)) { + log.info("Received work request for {} and {}, but it does not need replication. Ignoring...", file, target); + return; + } + + // Sanity check that nothing bad happened and our replication source still exists + Path filePath = new Path(file); + try { + if (!fs.exists(filePath)) { + log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target); + return; + } + } catch (IOException e) { + log.error("Could not determine if file exists {}", filePath, e); + throw new RuntimeException(e); + } + + // Replicate that sucker + Status replicatedStatus = replica.replicate(filePath, status, target); + + // If we got a different status + if (!replicatedStatus.equals(status)) { + // We actually did some work! + recordNewStatus(filePath, replicatedStatus, target); + } + + // otherwise, we didn't actually replicate because there was error sending the data + // we can just not record any updates, and it will be picked up again by the work assigner } + public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException, InvalidProtocolBufferException { + Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken())); + s.setRange(Range.exact(file)); + s.fetchColumn(WorkSection.NAME, target.toText()); + + return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get()); + } + + /** + * Record the updated Status for this file and target + * @param filePath Path to file being replicated + * @param status Updated Status after replication + * @param target Peer that was replicated to + */ + public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) { + try { + Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + Mutation m = new Mutation(filePath.toString()); + WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status)); + bw.addMutation(m); + bw.close(); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + log.error("Error recording updated Status for {}", filePath, e); + throw new RuntimeException(e); + } + + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java index 627e51a..e00f343 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java @@ -49,7 +49,7 @@ public class ReplicationWorker implements Runnable { @Override public void run() { try { - new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION, conf).startProcessing(new ReplicationProcessor(), executor); + new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION, conf).startProcessing(new ReplicationProcessor(inst, conf, fs), executor); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java index 33da8a0..92f6fb3 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java @@ -385,7 +385,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { s = ReplicationTable.getScanner(conn); WorkSection.limit(s); Entry<Key,Value> e = Iterables.getOnlyElement(s); - Text expectedColqual = ReplicationTarget.toText(new ReplicationTarget("cluster1", "4")); + Text expectedColqual = new ReplicationTarget("cluster1", "4").toText(); Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); notFound = false; } catch (NoSuchElementException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/baa7a4f7/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java index 613a01b..defff65 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java @@ -303,7 +303,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT { WorkSection.limit(s); try { Entry<Key,Value> e = Iterables.getOnlyElement(s); - Text expectedColqual = ReplicationTarget.toText(new ReplicationTarget("cluster1", "4")); + Text expectedColqual = new ReplicationTarget("cluster1", "4").toText(); Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); notFound = false; } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {