Merge remote-tracking branch 'origin/master' into ACCUMULO-378 Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ada6ce46 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ada6ce46 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ada6ce46 Branch: refs/heads/ACCUMULO-378 Commit: ada6ce464b1e9d818c06655369b30a45afa840c0 Parents: 98eb56f 47d5933 Author: Josh Elser <els...@apache.org> Authored: Wed Jun 4 22:07:59 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed Jun 4 22:07:59 2014 -0400 ---------------------------------------------------------------------- .../server/GarbageCollectionLogger.java | 103 ++++ .../apache/accumulo/tserver/TabletServer.java | 607 ++++++------------- .../accumulo/tserver/TservConstraintEnv.java | 83 +++ .../apache/accumulo/tserver/log/DfsLogger.java | 20 +- .../tserver/session/ConditionalSession.java | 41 ++ .../tserver/session/MultiScanSession.java | 62 ++ .../accumulo/tserver/session/ScanSession.java | 69 +++ .../accumulo/tserver/session/Session.java | 43 ++ .../accumulo/tserver/session/UpdateSession.java | 56 ++ .../apache/accumulo/tserver/tablet/Tablet.java | 12 +- .../apache/accumulo/test/AuditMessageIT.java | 5 + 11 files changed, 649 insertions(+), 452 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ada6ce46/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index e6286ff,ee28c7f..dd3c16e --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -78,11 -76,7 +76,8 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; - import org.apache.accumulo.core.constraints.Constraint.Environment; - import org.apache.accumulo.core.constraints.Violations; - import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.ConstraintViolationSummary; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; @@@ -116,8 -110,6 +111,7 @@@ import org.apache.accumulo.core.master. import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.replication.thrift.ReplicationServicer; - import org.apache.accumulo.core.security.AuthorizationContainer; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.SecurityUtil; import org.apache.accumulo.core.security.thrift.TCredentials; @@@ -213,8 -205,11 +207,13 @@@ import org.apache.accumulo.tserver.metr import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics; +import org.apache.accumulo.tserver.replication.ReplicationServicerHandler; +import org.apache.accumulo.tserver.replication.ReplicationWorker; + import org.apache.accumulo.tserver.session.ConditionalSession; + import org.apache.accumulo.tserver.session.MultiScanSession; + import org.apache.accumulo.tserver.session.ScanSession; + import org.apache.accumulo.tserver.session.Session; + import org.apache.accumulo.tserver.session.UpdateSession; import org.apache.accumulo.tserver.tablet.CommitSession; import org.apache.accumulo.tserver.tablet.CompactionInfo; import org.apache.accumulo.tserver.tablet.CompactionWatcher; @@@ -262,18 -258,56 +262,61 @@@ public class TabletServer extends Abstr return mincMetrics; } - private ServerConfiguration serverConfig; - private LogSorter logSorter = null; + private final ServerConfiguration serverConfig; + private final LogSorter logSorter; + private ReplicationWorker replWorker = null; + private final TabletStatsKeeper statsKeeper; + private final AtomicInteger logIdGenerator = new AtomicInteger(); + + private final VolumeManager fs; + public Instance getInstance() { + return serverConfig.getInstance(); + } + + private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>()); + private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + @SuppressWarnings("unchecked") + private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000)); + + private final TabletServerResourceManager resourceManager; + private final SecurityOperation security; + + private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>(); + + private Thread majorCompactorThread; + ++ private HostAndPort replicationAddress; + private HostAndPort clientAddress; + + private volatile boolean serverStopRequested = false; + private volatile boolean majorCompactorDisabled = false; + private volatile boolean shutdownComplete = false; + + private ZooLock tabletServerLock; + + private TServer server; ++ private TServer replServer; + + private DistributedWorkQueue bulkFailedCopyQ; + + private String lockID; + + private static ObjectName OBJECT_NAME = null; + + public static final AtomicLong seekCount = new AtomicLong(0); + + private final AtomicLong totalMinorCompactions = new AtomicLong(0); public TabletServer(ServerConfiguration conf, VolumeManager fs) { super(); this.serverConfig = conf; - this.instance = conf.getInstance(); this.fs = fs; AccumuloConfiguration aconf = getSystemConfiguration(); - this.logSorter = new LogSorter(getInstance(), fs, aconf); ++ Instance instance = getInstance(); + this.logSorter = new LogSorter(instance, fs, aconf); + this.replWorker = new ReplicationWorker(instance, fs, aconf); + this.statsKeeper = new TabletStatsKeeper(); SimpleTimer.getInstance(aconf).schedule(new Runnable() { @Override public void run() { @@@ -3122,29 -2856,6 +2880,29 @@@ return address; } + private HostAndPort startReplicationService() throws UnknownHostException { + ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance())); + ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); + AccumuloConfiguration conf = getSystemConfiguration(); + Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, + "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + this.replServer = sp.server; + log.info("Started replication service on " + sp.address); + + try { + // The replication service is unique to the thrift service for a tserver, not just a host. + // Advertise the host and port for replication service given the host and port for the tserver. - ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(), ++ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(), + sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + log.error("Could not advertise replication service port", e); + throw new RuntimeException(e); + } + + return sp.address; + } + public ZooLock getLock() { return tabletServerLock; } @@@ -3527,6 -3205,6 +3280,13 @@@ return clientAddress.getHostText() + ":" + clientAddress.getPort(); } ++ public String getReplicationAddressSTring() { ++ if (null == replicationAddress) { ++ return null; ++ } ++ return replicationAddress.getHostText() + ":" + replicationAddress.getPort(); ++ } ++ public TServerInstance getTabletSession() { String address = getClientAddressString(); if (address == null) http://git-wip-us.apache.org/repos/asf/accumulo/blob/ada6ce46/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ----------------------------------------------------------------------