ACCUMULO-2583 Advertise peer master coordinator service port in ZK. Need to use the proper contact info for the peer master. Some more configuration Properties for tweaking things. Better logging.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e84879c8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e84879c8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e84879c8 Branch: refs/heads/ACCUMULO-378 Commit: e84879c8dd814e1f00c9b109f095a45015224c7f Parents: 53fc90f Author: Josh Elser <els...@apache.org> Authored: Thu May 8 21:56:45 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 8 21:56:45 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/Constants.java | 1 + .../core/client/impl/ReplicationClient.java | 39 +++++++++++++++++--- .../org/apache/accumulo/core/conf/Property.java | 10 ++++- .../java/org/apache/accumulo/master/Master.java | 16 +++++++- .../replication/ReplicationWorkAssigner.java | 7 +++- .../apache/accumulo/tserver/TabletServer.java | 2 +- .../replication/AccumuloReplicaSystem.java | 20 +++++++--- .../test/replication/ReplicationIT.java | 34 +++++++++++------ 8 files changed, 100 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/Constants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index c87119c..f230690 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -43,6 +43,7 @@ public class Constants { public static final String ZMASTERS = "/masters"; public static final String ZMASTER_LOCK = ZMASTERS + "/lock"; public static final String ZMASTER_GOAL_STATE = ZMASTERS + "/goal_state"; + public static final String ZMASTER_REPLICATION_COORDINATOR_PORT = ZMASTERS + "/repl_coord_port"; public static final String ZGC = "/gc"; public static final String ZGC_LOCK = ZGC + "/lock"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index df12ae8..65565f8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -20,22 +20,29 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; import java.util.List; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.core.replication.thrift.ReplicationServicer; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.net.HostAndPort; + public class ReplicationClient { private static final Logger log = LoggerFactory.getLogger(ReplicationClient.class); @@ -65,21 +72,41 @@ public class ReplicationClient { return null; } - String master = locations.get(0); - if (master.endsWith(":0")) + // This is the master thrift service, we just want the hostname, not the port + String masterThriftService = locations.get(0); + if (masterThriftService.endsWith(":0")) + return null; + + + AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance); + + HostAndPort masterAddr = HostAndPort.fromString(masterThriftService); + String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_PORT; + String replCoordinatorPort; + + // Get the coordinator port for the master we're trying to connect to + try { + ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + replCoordinatorPort = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8); + } catch (KeeperException | InterruptedException e) { + log.debug("Could not fetch remote coordinator port"); return null; + } + + // Throw the hostname and port through HostAndPort to get some normalization + HostAndPort coordinatorAddr = HostAndPort.fromParts(masterAddr.getHostText(), Integer.parseInt(replCoordinatorPort)); try { // Master requests can take a long time: don't ever time out - ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), master, - ServerConfigurationUtil.getConfiguration(instance)); + ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(), + conf); return client; } catch (TTransportException tte) { if (tte.getCause().getClass().equals(UnknownHostException.class)) { // do not expect to recover from this throw new RuntimeException(tte); } - log.debug("Failed to connect to master={}, will retry... ", master, tte); + log.debug("Failed to connect to master coordinator service ({}), will retry... ", coordinatorAddr.toString(), tte); return null; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d611bd5..a87b1b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -182,6 +182,12 @@ public enum Property { @Experimental MASTER_REPLICATION_SCAN_INTERVAL("master.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep before scanning the status section of the replication table for new data"), + @Experimental + MASTER_REPLICATION_COORDINATOR_PORT("master.replication.coordinator.port", "10001", PropertyType.PORT, "Port for the replication coordinator service"), + @Experimental + MASTER_REPLICATION_COORDINATOR_MINTHREADS("master.replication.coordinator.minthreads", "4", PropertyType.COUNT, "Minimum number of threads dedicated to answering coordinator requests"), + @Experimental + MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the coordinator thread pool"), // properties that are specific to tablet server behavior TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), @@ -457,13 +463,15 @@ public enum Property { @Experimental REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), @Experimental - REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10001", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"), + REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"), @Experimental REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"), @Experimental REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replciation"), @Experimental REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."), + @Experimental + REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"), ; http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 0eac9ab..b0745eb 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -60,6 +60,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.NamespacePermission; @@ -76,6 +77,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.master.recovery.RecoveryManager; +import org.apache.accumulo.master.replication.MasterReplicationCoordinator; import org.apache.accumulo.master.replication.ReplicationDriver; import org.apache.accumulo.master.replication.ReplicationWorkAssigner; import org.apache.accumulo.master.state.TableCounts; @@ -977,8 +979,6 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new IOException(e); } - - Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this))); ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); @@ -990,6 +990,17 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt while (!clientService.isServing()) { UtilWaitThread.sleep(100); } + + // Start the replication coordinator which assigns tservers to service replication requests + ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>( + TraceWrap.service(new MasterReplicationCoordinator(this, getSystemConfiguration()))); + ServerAddress replAddress = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, "Master Replication Coordinator", + "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + + // Advertise that port we used so peers don't have to be told what it is + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_PORT, + Integer.toString(replAddress.address.getPort()).getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + while (clientService.isServing()) { UtilWaitThread.sleep(500); } @@ -1000,6 +1011,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt statusThread.join(remaining(deadline)); replicationWorkAssigner.join(remaining(deadline)); replicationWorkDriver.join(remaining(deadline)); + replAddress.server.stop(); // quit, even if the tablet servers somehow jam up and the watchers // don't stop http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java index 24842a9..1dd20da 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java @@ -152,6 +152,8 @@ public class ReplicationWorkAssigner extends Daemon { @Override public void run() { + log.info("Starting replication work assignment thread"); + while (master.stillMaster()) { if (null == conf) { conf = master.getConfiguration().getConfiguration(); @@ -178,7 +180,9 @@ public class ReplicationWorkAssigner extends Daemon { // Keep the state of the work we queued correct cleanupFinishedWork(); - UtilWaitThread.sleep(conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP)); + long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP); + log.debug("Sleeping {} ms", sleepTime); + UtilWaitThread.sleep(sleepTime); } } @@ -205,6 +209,7 @@ public class ReplicationWorkAssigner extends Daemon { // to add more work entries if (queuedWork.size() > maxQueueSize) { log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize); + UtilWaitThread.sleep(5000); return; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 803419c..0b5c6bf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } private HostAndPort startReplicationService() throws UnknownHostException { - ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler()); + ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(this)); ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); AccumuloConfiguration conf = getSystemConfiguration(); Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 40676f7..f275cdb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -131,21 +131,29 @@ public class AccumuloReplicaSystem implements ReplicaSystem { Long entriesReplicated; //TODO should chunk up the given file into some configurable sizes instead of just sending the entire file all at once // configuration should probably just be size based. - final long sizeLimit = Long.MAX_VALUE; + final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE); try { entriesReplicated = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new ClientExecReturn<Long,ReplicationServicer.Client>() { @Override public Long execute(Client client) throws Exception { // RFiles have an extension, call everything else a WAL if (p.getName().endsWith(RFILE_SUFFIX)) { - return client.replicateKeyValues(remoteTableId, getKeyValues(p, status, sizeLimit)); + KeyValues kvs = getKeyValues(p, status, sizeLimit); + if (0 < kvs.getKeyValuesSize()) { + return client.replicateKeyValues(remoteTableId, kvs); + } } else { - return client.replicateLog(remoteTableId, getWalEdits(p, status, sizeLimit)); + WalEdits edits = getWalEdits(p, status, sizeLimit); + if (0 < edits.getEditsSize()) { + return client.replicateLog(remoteTableId, edits); + } } + + return 0l; } }); - log.debug("Replicated {} entries from {} to {} which is a part of {}", entriesReplicated, p, peerTserver, peerInstance.getInstanceName()); + log.debug("Replicated {} entries from {} to {} which is a member of the peer '{}'", entriesReplicated, p, peerTserver, peerInstance.getInstanceName()); // Update the begin to account for what we replicated Status updatedStatus = Status.newBuilder(status).setBegin(status.getBegin() + entriesReplicated).build(); @@ -195,7 +203,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { key.readFields(wal); value.readFields(wal); } catch (EOFException e) { - log.trace("Caught EOFException, no more data to replicate"); + log.debug("Caught EOFException, no more data to replicate"); break; } @@ -217,7 +225,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } } - log.debug("Returning {} bytes of WAL entries for replication for {}", size, p); + log.debug("Binned {} bytes of WAL entries for replication to peer '{}'", size, p); return edits; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java index 0d4099c..0ad8066 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java @@ -33,9 +33,12 @@ import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.protobuf.TextFormat; @@ -43,13 +46,16 @@ import com.google.protobuf.TextFormat; * */ public class ReplicationIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class); @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setNumTservers(1); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "32M"); cfg.setProperty(Property.GC_CYCLE_START, "1s"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "5s"); + cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @@ -59,7 +65,8 @@ public class ReplicationIT extends ConfigurableMacIT { ROOT_PASSWORD); peerCfg.setNumTservers(1); peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10002"); + peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003"); + peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004"); MiniAccumuloClusterImpl peerCluster = peerCfg.build(); peerCluster.start(); @@ -89,13 +96,15 @@ public class ReplicationIT extends ConfigurableMacIT { // Write some data to table1 BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); - for (int rows = 0; rows < 250; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); + for (int i = 0; i < 100; i++) { + for (int rows = 0; rows < 1000; rows++) { + Mutation m = new Mutation(i + "_" + Integer.toString(rows)); + for (int cols = 0; cols < 400; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); + } + bw.addMutation(m); } - bw.addMutation(m); } bw.close(); @@ -104,17 +113,18 @@ public class ReplicationIT extends ConfigurableMacIT { Thread.sleep(500); } + connMaster.tableOperations().compact(masterTable, null, null, true, false); for (int i = 0; i < 10; i++) { - Scanner s = ReplicationTable.getScanner(connMaster); for (Entry<Key,Value> e : s) { - log.info(e.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get()))); + Path p = new Path(e.getKey().getRow().toString()); + log.info(p.getName() + " " + e.getKey().getColumnFamily() + " " + e.getKey().getColumnQualifier() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get()))); } log.info(""); log.info(""); - Thread.sleep(1000); + Thread.sleep(3000); } peerCluster.stop();