ACCUMULO-378 Core review fixes from bhavanki for replication
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/84e94a42 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/84e94a42 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/84e94a42 Branch: refs/heads/ACCUMULO-378 Commit: 84e94a429bd92e469156642b1bfd69c422759e2d Parents: 2f02d69 Author: Josh Elser <[email protected]> Authored: Wed Jun 4 13:52:58 2014 -0400 Committer: Josh Elser <[email protected]> Committed: Wed Jun 4 13:52:58 2014 -0400 ---------------------------------------------------------------------- .../client/admin/ReplicationOperations.java | 12 ++--- .../core/client/impl/ReplicationClient.java | 34 +++++++----- .../client/impl/ReplicationOperationsImpl.java | 52 ++++++++---------- .../replication/PeerNotFoundException.java | 4 ++ .../core/client/replication/ReplicaSystem.java | 3 +- .../replication/ReplicaSystemFactory.java | 6 ++- .../org/apache/accumulo/core/data/Mutation.java | 13 +++-- .../master/replication/ReplicationDriver.java | 13 +++-- .../test/replication/CyclicReplicationIT.java | 43 +++++++-------- .../UnorderedWorkAssignerReplicationIT.java | 57 ++++++++++---------- 10 files changed, 128 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java index 1d20f79..5873f73 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java @@ -32,14 +32,14 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem; public interface ReplicationOperations { /** - * Define a cluster with the given name using the given {@link ReplicaSystem} + * Defines a cluster with the given name using the given {@link ReplicaSystem}. * @param name Name of the cluster, used for configuring replication on tables * @param system Type of system to be replicated to */ public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException, PeerExistsException; /** - * Define a cluster with the given name and the given name system + * Defines a cluster with the given name and the given name system. * @param name Unique name for the cluster * @param replicaType {@link ReplicaSystem} class name to use to replicate the data * @throws PeerExistsException @@ -47,14 +47,14 @@ public interface ReplicationOperations { public void addPeer(String name, String replicaType) throws AccumuloException, AccumuloSecurityException, PeerExistsException; /** - * Remove a cluster with the given name + * Removes a cluster with the given name. * @param name Name of the cluster to remove * @throws PeerNotFoundException */ public void removePeer(String name) throws AccumuloException, AccumuloSecurityException, PeerNotFoundException; /** - * Wait for a table to be fully replicated + * Waits for a table to be fully replicated. * @param tableName The table to wait for * @throws AccumuloException * @throws AccumuloSecurityException @@ -62,7 +62,7 @@ public interface ReplicationOperations { public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; /** - * Wait for a table to be fully replicated as determined by the provided tables + * Waits for a table to be fully replicated as determined by the provided tables. * @param tableName The table to wait for * @throws AccumuloException * @throws AccumuloSecurityException @@ -70,7 +70,7 @@ public interface ReplicationOperations { public void drain(String tableName, Set<String> files) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; /** - * Get all of the referenced files for a table + * Gets all of the referenced files for a table. * @param tableName * @throws TableNotFoundException */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index d7b12c7..13c027a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.core.client.impl; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import java.net.UnknownHostException; @@ -52,14 +51,19 @@ public class ReplicationClient { * @return Client to the ReplicationCoordinator service */ public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(Instance instance) throws AccumuloException { - checkArgument(instance != null, "instance is null"); + checkNotNull(instance); for (int attempts = 1; attempts <= 10; attempts++) { ReplicationCoordinator.Client result = getCoordinatorConnection(instance); if (result != null) return result; - UtilWaitThread.sleep(attempts * 250); + log.debug("Could not get ReplicationCoordinator connection to {}, will retry", instance.getInstanceName()); + try { + Thread.sleep(attempts * 250); + } catch (InterruptedException e) { + throw new AccumuloException(e); + } } throw new AccumuloException("Timed out trying to communicate with master from " + instance.getInstanceName()); @@ -69,14 +73,16 @@ public class ReplicationClient { List<String> locations = instance.getMasterLocations(); if (locations.size() == 0) { - log.debug("No masters..."); + log.debug("No masters for replication to instance {}", instance.getInstanceName()); return null; } // This is the master thrift service, we just want the hostname, not the port String masterThriftService = locations.get(0); - if (masterThriftService.endsWith(":0")) + if (masterThriftService.endsWith(":0")) { + log.warn("Master found for {} did not have real location {}", instance.getInstanceName(), masterThriftService); return null; + } AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance); @@ -91,7 +97,7 @@ public class ReplicationClient { ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); replCoordinatorAddr = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8); } catch (KeeperException | InterruptedException e) { - log.error("Could not fetch remote coordinator port"); + log.error("Could not fetch remote coordinator port", e); return null; } @@ -106,11 +112,7 @@ public class ReplicationClient { conf); return client; } catch (TTransportException tte) { - if (tte.getCause().getClass().equals(UnknownHostException.class)) { - // do not expect to recover from this - throw new RuntimeException(tte); - } - log.debug("Failed to connect to master coordinator service ({}), will retry... ", coordinatorAddr.toString(), tte); + log.debug("Failed to connect to master coordinator service ({})", coordinatorAddr.toString(), tte); return null; } } @@ -157,13 +159,17 @@ public class ReplicationClient { public static <T> T executeCoordinatorWithReturn(Instance instance, ClientExecReturn<T,ReplicationCoordinator.Client> exec) throws AccumuloException, AccumuloSecurityException { ReplicationCoordinator.Client client = null; - while (true) { + for (int i = 0; i < 10; i++) { try { client = getCoordinatorConnectionWithRetry(instance); return exec.execute(client); } catch (TTransportException tte) { log.debug("ReplicationClient coordinator request failed, retrying ... ", tte); - UtilWaitThread.sleep(100); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new AccumuloException(e); + } } catch (ThriftSecurityException e) { throw new AccumuloSecurityException(e.user, e.code, e); } catch (AccumuloException e) { @@ -175,6 +181,8 @@ public class ReplicationClient { close(client); } } + + throw new AccumuloException("Could not connect to ReplicationCoordinator at " + instance.getInstanceName()); } public static void executeCoordinator(Instance instance, ClientExec<ReplicationCoordinator.Client> exec) throws AccumuloException, AccumuloSecurityException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java index 4355867..51a5367 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java @@ -66,10 +66,12 @@ import com.google.protobuf.InvalidProtocolBufferException; public class ReplicationOperationsImpl implements ReplicationOperations { private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImpl.class); - private Instance inst; - private Credentials creds; + private final Instance inst; + private final Credentials creds; public ReplicationOperationsImpl(Instance inst, Credentials creds) { + checkNotNull(inst); + checkNotNull(creds); this.inst = inst; this.creds = creds; } @@ -125,32 +127,16 @@ public class ReplicationOperationsImpl implements ReplicationOperations { checkNotNull(tableName); Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); - TableOperations tops = conn.tableOperations(); - while (!tops.exists(ReplicationTable.NAME)) { - UtilWaitThread.sleep(200); - } - - if (!conn.tableOperations().exists(tableName)) { - throw new TableNotFoundException(null, tableName, null); - } - - String strTableId = null; - while (null == strTableId) { - strTableId = tops.tableIdMap().get(tableName); - if (null == strTableId) { - UtilWaitThread.sleep(200); - } - } - - Text tableId = new Text(strTableId); + Text tableId = getTableId(conn, tableName); log.info("Waiting for {} to be replicated for {}", wals, tableId); log.info("Reading from metadata table"); boolean allMetadataRefsReplicated = false; + final Set<Range> range = Collections.singleton(new Range(ReplicationSection.getRange())); while (!allMetadataRefsReplicated) { BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); - bs.setRanges(Collections.singleton(new Range(ReplicationSection.getRange()))); + bs.setRanges(range); bs.fetchColumnFamily(ReplicationSection.COLF); try { allMetadataRefsReplicated = allReferencesReplicated(bs, tableId, wals); @@ -228,13 +214,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations { return true; } - @Override - public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - checkNotNull(tableName); - - log.debug("Collecting referenced files for replication of table {}", tableName); - - Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { TableOperations tops = conn.tableOperations(); while (!tops.exists(ReplicationTable.NAME)) { UtilWaitThread.sleep(200); @@ -252,13 +232,23 @@ public class ReplicationOperationsImpl implements ReplicationOperations { } } - Text tableId = new Text(strTableId); + return new Text(strTableId); + } + + @Override + public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + checkNotNull(tableName); + + log.debug("Collecting referenced files for replication of table {}", tableName); + + Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + Text tableId = getTableId(conn, tableName); - log.debug("Found id of {} for name {}", strTableId, tableName); + log.debug("Found id of {} for name {}", tableId, tableName); // Get the WALs currently referenced by the table BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); - metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId))); + metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(tableId.toString()))); metaBs.fetchColumnFamily(LogColumnFamily.NAME); Set<String> wals = new HashSet<>(); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java b/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java index 1859c62..4e02218 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java +++ b/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java @@ -33,4 +33,8 @@ public class PeerNotFoundException extends Exception { public PeerNotFoundException(String message, Throwable cause) { super(message, cause); } + + public PeerNotFoundException(String peer, String message, Throwable cause) { + super("Peer '" + peer + "' not found " + message, cause); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java index e20d35f..cc51a11 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java +++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java @@ -31,6 +31,7 @@ public interface ReplicaSystem { * @param p Path to the resource we're reading from * @param status Information to replicate * @param target The peer + * @param helper Instance of ReplicaSystemHelper * @return A new Status for the progress that was made */ public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper); @@ -39,7 +40,7 @@ public interface ReplicaSystem { * Configure the implementation with necessary information from the system configuration * <p> * For example, we only need one implementation for Accumulo, but, for each peer, - * we have a ZK quorom and instance name + * we have a ZK quorum and instance name * @param configuration */ public void configure(String configuration); http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java index d1df97e..164512a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java @@ -27,6 +27,8 @@ import com.google.common.base.Preconditions; public class ReplicaSystemFactory { private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class); + private ReplicaSystemFactory() {} + /** * @param value * {@link ReplicaSystem} implementation class name @@ -53,10 +55,10 @@ public class ReplicaSystemFactory { return rs; } - throw new RuntimeException("Class is not assignable to ReplicaSystem: " + name); + throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + name); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { log.error("Error creating ReplicaSystem object", e); - throw new RuntimeException(e); + throw new IllegalArgumentException(e); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/data/Mutation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java index 619e522..a134ec8 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java +++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java @@ -784,6 +784,9 @@ public class Mutation implements Writable { * @return An unmodifiable view of the replication sources */ public Set<String> getReplicationSources() { + if (null == replicationSources) { + return EMPTY; + } return Collections.unmodifiableSet(replicationSources); } @@ -926,9 +929,13 @@ public class Mutation implements Writable { } } if (0x02 == (0x02 & hasValues)) { - WritableUtils.writeVInt(out, replicationSources.size()); - for (String source : replicationSources) { - WritableUtils.writeString(out, source); + if (null == replicationSources) { + WritableUtils.writeVInt(out, 0); + } else { + WritableUtils.writeVInt(out, replicationSources.size()); + for (String source : replicationSources) { + WritableUtils.writeString(out, source); + } } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java index b340009..e98bc1d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java @@ -27,13 +27,14 @@ import org.apache.accumulo.master.Master; import org.apache.accumulo.trace.instrument.CountSampler; import org.apache.accumulo.trace.instrument.Sampler; import org.apache.accumulo.trace.instrument.Trace; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Daemon wrapper around the {@link WorkMaker} that separates it from the Master */ public class ReplicationDriver extends Daemon { - private static final Logger log = Logger.getLogger(ReplicationDriver.class); + private static final Logger log = LoggerFactory.getLogger(ReplicationDriver.class); private final Master master; private final AccumuloConfiguration conf; @@ -95,7 +96,13 @@ public class ReplicationDriver extends Daemon { Trace.offNoFlush(); // Sleep for a bit - UtilWaitThread.sleep(conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL)); + long sleepMillis = conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL); + log.debug("Sleeping for {}ms before re-running", sleepMillis); + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + log.error("Interrupted while sleeping", e); + } } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java index a03cfab..a75113b 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java @@ -121,6 +121,7 @@ public class CyclicReplicationIT { String master1UserName = "master1", master1Password = "foo"; String master2UserName = "master2", master2Password = "bar"; + String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName(); connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password)); connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password)); @@ -142,27 +143,27 @@ public class CyclicReplicationIT { ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers()))); - connMaster1.tableOperations().create(master1Cluster.getInstanceName(), false); - String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Cluster.getInstanceName()); + connMaster1.tableOperations().create(master1Table, false); + String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table); Assert.assertNotNull(master1TableId); - connMaster2.tableOperations().create(master2Cluster.getInstanceName(), false); - String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Cluster.getInstanceName()); + connMaster2.tableOperations().create(master2Table, false); + String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table); Assert.assertNotNull(master2TableId); // Replicate master1 in the master1 cluster to master2 in the master2 cluster - connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(), "true"); - connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(), + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION_TARGETS.getKey() + master2Cluster.getInstanceName(), master2TableId); // Replicate master2 in the master2 cluster to master1 in the master2 cluster - connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(), "true"); - connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(), + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION_TARGETS.getKey() + master1Cluster.getInstanceName(), master1TableId); // Give our replication user the ability to write to the respective table - connMaster1.securityOperations().grantTablePermission(master1UserName, master1Cluster.getInstanceName(), TablePermission.WRITE); - connMaster2.securityOperations().grantTablePermission(master2UserName, master2Cluster.getInstanceName(), TablePermission.WRITE); + connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE); + connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE); IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class); SummingCombiner.setEncodingType(summingCombiner, Type.STRING); @@ -170,17 +171,17 @@ public class CyclicReplicationIT { // Set a combiner on both instances that will sum multiple values // We can use this to verify that the mutation was not sent multiple times - connMaster1.tableOperations().attachIterator(master1Cluster.getInstanceName(), summingCombiner); - connMaster2.tableOperations().attachIterator(master2Cluster.getInstanceName(), summingCombiner); + connMaster1.tableOperations().attachIterator(master1Table, summingCombiner); + connMaster2.tableOperations().attachIterator(master2Table, summingCombiner); // Write a single entry - BatchWriter bw = connMaster1.createBatchWriter(master1Cluster.getInstanceName(), new BatchWriterConfig()); + BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig()); Mutation m = new Mutation("row"); m.put("count", "", "1"); bw.addMutation(m); bw.close(); - Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Cluster.getInstanceName()); + Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table); log.info("Found {} that need replication from master1", files); @@ -194,22 +195,22 @@ public class CyclicReplicationIT { log.info("Restarted tserver on master1"); // Sanity check that the element is there on master1 - Scanner s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY); + Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); Entry<Key,Value> entry = Iterables.getOnlyElement(s); Assert.assertEquals("1", entry.getValue().toString()); // Wait for this table to replicate - connMaster1.replicationOperations().drain(master1Cluster.getInstanceName(), files); + connMaster1.replicationOperations().drain(master1Table, files); Thread.sleep(5000); // Check that the element made it to master2 only once - s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY); + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); entry = Iterables.getOnlyElement(s); Assert.assertEquals("1", entry.getValue().toString()); // Wait for master2 to finish replicating it back - files = connMaster2.replicationOperations().referencedFiles(master2Cluster.getInstanceName()); + files = connMaster2.replicationOperations().referencedFiles(master2Table); // Kill and restart the tserver to close the WAL on master2 for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { @@ -219,16 +220,16 @@ public class CyclicReplicationIT { master2Cluster.exec(TabletServer.class); // Check that the element made it to master2 only once - s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY); + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); entry = Iterables.getOnlyElement(s); Assert.assertEquals("1", entry.getValue().toString()); - connMaster2.replicationOperations().drain(master2Cluster.getInstanceName(), files); + connMaster2.replicationOperations().drain(master2Table, files); Thread.sleep(5000); // Verify that the entry wasn't sent back to master1 - s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY); + s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); entry = Iterables.getOnlyElement(s); Assert.assertEquals("1", entry.getValue().toString()); } finally { http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java index 6c21962..d561d2f 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java @@ -113,40 +113,40 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { try { final Connector connMaster = getConnector(); final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); - + ReplicationTable.create(connMaster); String peerUserName = "peer", peerPassword = "foo"; - + String peerClusterName = "peer"; connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); - + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); - + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers connMaster.instanceOperations().setProperty( Property.REPLICATION_PEERS.getKey() + peerClusterName, ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); - + final String masterTable = "master", peerTable = "peer"; - + connMaster.tableOperations().create(masterTable); String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); Assert.assertNotNull(masterTableId); - + connPeer.tableOperations().create(peerTable); String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); Assert.assertNotNull(peerTableId); connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); - + // Replicate this table to the peerClusterName in a table with the peerTableId table id connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId); - + // Write some data to table1 BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); for (int rows = 0; rows < 5000; rows++) { @@ -157,23 +157,23 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { } bw.addMutation(m); } - + bw.close(); - + log.info("Wrote all data to master cluster"); - + final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); - + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { cluster.killProcess(ServerType.TABLET_SERVER, proc); } cluster.exec(TabletServer.class); - + log.info("TabletServer restarted"); for (@SuppressWarnings("unused") Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {} log.info("TabletServer is online"); - + log.info(""); log.info("Fetching metadata records:"); for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { @@ -183,33 +183,33 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); } } - + log.info(""); log.info("Fetching replication records:"); for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); } - + Future<Boolean> future = executor.submit(new Callable<Boolean>() { - + @Override public Boolean call() throws Exception { connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); log.info("Drain completed"); return true; } - + }); - + try { future.get(30, TimeUnit.SECONDS); } catch (TimeoutException e) { future.cancel(true); Assert.fail("Drain did not finish within 30 seconds"); } - + log.info("drain completed"); - + log.info(""); log.info("Fetching metadata records:"); for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { @@ -219,13 +219,13 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); } } - + log.info(""); log.info("Fetching replication records:"); for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); } - + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); Entry<Key,Value> masterEntry = null, peerEntry = null; @@ -236,10 +236,10 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); } - + log.info("Last master entry: " + masterEntry); log.info("Last peer entry: " + peerEntry); - + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); } finally { @@ -377,7 +377,7 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() .startsWith(masterTable1)); } - + log.info("Found {} records in {}", countTable, peerTable1); if (masterTable1Records != countTable) { @@ -394,7 +394,7 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() .startsWith(masterTable2)); } - + log.info("Found {} records in {}", countTable, peerTable2); if (masterTable2Records != countTable) { @@ -605,7 +605,6 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT { Thread.sleep(500); } - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { cluster.killProcess(ServerType.TABLET_SERVER, proc); }
