Merge remote-tracking branch 'origin/master' into ACCUMULO-378 Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e81eee7f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e81eee7f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e81eee7f Branch: refs/heads/ACCUMULO-378 Commit: e81eee7f7cd2641ffdace5af48a5027f7fcce620 Parents: 73d34ec f280e97 Author: Josh Elser <els...@apache.org> Authored: Tue Jun 3 21:38:00 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Tue Jun 3 21:38:00 2014 -0400 ---------------------------------------------------------------------- .../accumulo/core/client/ZooKeeperInstance.java | 7 +- .../core/client/impl/ConditionalWriterImpl.java | 4 +- .../client/impl/InstanceOperationsImpl.java | 3 +- .../accumulo/core/client/impl/Namespaces.java | 3 +- .../core/client/impl/RootTabletLocator.java | 14 +- .../accumulo/core/client/impl/ServerClient.java | 15 +- .../accumulo/core/client/impl/Tables.java | 3 +- .../core/client/impl/ZookeeperLockChecker.java | 10 +- .../org/apache/accumulo/core/conf/Property.java | 8 +- .../iterators/conf/ColumnToClassMapping.java | 1 + .../accumulo/core/util/AsyncSocketAppender.java | 3 +- .../core/client/ZooKeeperInstanceTest.java | 144 + .../core/client/impl/RootTabletLocatorTest.java | 61 + .../client/impl/ZookeeperLockCheckerTest.java | 58 + .../core/util/AsyncSocketAppenderTest.java | 8 +- docs/src/main/asciidoc/chapters/replication.txt | 21 + .../accumulo/fate/zookeeper/ZooCache.java | 16 +- .../fate/zookeeper/ZooCacheFactory.java | 78 + .../apache/accumulo/fate/zookeeper/ZooLock.java | 2 +- .../fate/zookeeper/ZooReaderWriter.java | 4 - .../fate/zookeeper/ZooCacheFactoryTest.java | 87 + .../accumulo/server/client/HdfsZooInstance.java | 3 +- .../server/conf/NamespaceConfiguration.java | 3 +- .../server/conf/TableConfiguration.java | 3 +- .../accumulo/server/conf/ZooConfiguration.java | 5 +- .../accumulo/server/tablets/TabletTime.java | 1 - .../zookeeper/ZooReaderWriterFactory.java | 2 - .../apache/accumulo/server/AccumuloTest.java | 1 - .../server/watcher/MonitorLog4jWatcherTest.java | 8 +- .../accumulo/tserver/CompactionStats.java | 59 - .../accumulo/tserver/CompactionWatcher.java | 110 - .../org/apache/accumulo/tserver/Compactor.java | 548 --- .../apache/accumulo/tserver/FileManager.java | 12 +- .../apache/accumulo/tserver/InMemoryMap.java | 2 +- .../accumulo/tserver/MinorCompactionReason.java | 21 + .../apache/accumulo/tserver/MinorCompactor.java | 146 - .../java/org/apache/accumulo/tserver/Rate.java | 60 - .../org/apache/accumulo/tserver/RootFiles.java | 133 - .../tserver/TConstraintViolationException.java | 54 + .../org/apache/accumulo/tserver/Tablet.java | 3856 ------------------ .../tserver/TabletIteratorEnvironment.java | 8 +- .../apache/accumulo/tserver/TabletServer.java | 83 +- .../tserver/TabletServerResourceManager.java | 67 +- .../accumulo/tserver/TabletStatsKeeper.java | 6 + .../apache/accumulo/tserver/log/DfsLogger.java | 60 +- .../accumulo/tserver/log/LocalWALRecovery.java | 14 +- .../tserver/log/TabletServerLogger.java | 4 +- .../apache/accumulo/tserver/tablet/Batch.java | 51 + .../accumulo/tserver/tablet/CommitSession.java | 121 + .../accumulo/tserver/tablet/CompactionInfo.java | 129 + .../tserver/tablet/CompactionRunner.java | 76 + .../tserver/tablet/CompactionStats.java | 59 + .../tserver/tablet/CompactionWatcher.java | 110 + .../accumulo/tserver/tablet/Compactor.java | 424 ++ .../tserver/tablet/CountingIterator.java | 78 + .../tserver/tablet/DatafileManager.java | 605 +++ .../apache/accumulo/tserver/tablet/KVEntry.java | 39 + .../tserver/tablet/MinorCompactionTask.java | 99 + .../accumulo/tserver/tablet/MinorCompactor.java | 142 + .../apache/accumulo/tserver/tablet/Rate.java | 60 + .../accumulo/tserver/tablet/RootFiles.java | 133 + .../accumulo/tserver/tablet/ScanBatch.java | 37 + .../accumulo/tserver/tablet/ScanDataSource.java | 222 + .../accumulo/tserver/tablet/ScanOptions.java | 82 + .../apache/accumulo/tserver/tablet/Scanner.java | 136 + .../accumulo/tserver/tablet/SplitInfo.java | 76 + .../accumulo/tserver/tablet/SplitRowSpec.java | 29 + .../apache/accumulo/tserver/tablet/Tablet.java | 2581 ++++++++++++ .../tserver/tablet/TabletClosedException.java | 29 + .../tserver/tablet/TabletCommitter.java | 51 + .../accumulo/tserver/tablet/TabletMemory.java | 190 + .../accumulo/tserver/CountingIteratorTest.java | 2 +- .../apache/accumulo/tserver/RootFilesTest.java | 149 - .../accumulo/tserver/tablet/RootFilesTest.java | 150 + .../test/functional/MonitorLoggingIT.java | 1 - test/system/continuous/master-agitator.pl | 3 +- 76 files changed, 6430 insertions(+), 5253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 8ad849b,1200fd1..59955f3 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -454,26 -441,6 +454,26 @@@ public enum Property GENERAL_MAVEN_PROJECT_BASEDIR(AccumuloClassLoader.MAVEN_PROJECT_BASEDIR_PROPERTY_NAME, AccumuloClassLoader.DEFAULT_MAVEN_PROJECT_BASEDIR_VALUE, PropertyType.ABSOLUTEPATH, "Set this to automatically add maven target/classes directories to your dynamic classpath"), + // General properties for configuring replication + REPLICATION_PREFIX("replication.", null, PropertyType.PREFIX, "Properties in this category affect the replication of data to other Accumulo instances."), + REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX, "Properties in this category control what systems data can be replicated to"), + REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"), + @Sensitive + REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"), + REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"), + REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "1000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"), + REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"), + REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), + REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"), + REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"), - REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replciation"), - REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."), ++ REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replication"), ++ REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "30s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."), + REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"), + REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.SequentialWorkAssigner", PropertyType.CLASSNAME, + "Replication WorkAssigner implementation to use"), - REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"), - REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work"), ++ REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work, not useful outside of tests"), ++ REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work, not useful outside of tests"), + ; private String key, defaultValue, description; http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/docs/src/main/asciidoc/chapters/replication.txt ---------------------------------------------------------------------- diff --cc docs/src/main/asciidoc/chapters/replication.txt index 9f367df,0000000..dc87b62 mode 100644,000000..100644 --- a/docs/src/main/asciidoc/chapters/replication.txt +++ b/docs/src/main/asciidoc/chapters/replication.txt @@@ -1,184 -1,0 +1,205 @@@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +== Replication + +=== Overview + +Replication is a feature of Accumulo which provides a mechanism to automatically +copy data to other systems, typically for the purpose of disaster recovery, +high availability, or geographic locality. It is best to consider this feature +as a framework for automatic replication instead of the ability to copy data +from to another Accumulo instance as copying to another Accumulo cluster is +only an implementation detail. The local Accumulo cluster is hereby referred +to as the +primary+ while systems being replicated to are known as ++peers+. + +This replication framework makes two Accumulo instances, where one instance +replicates to another, eventually consistent between one another, as opposed +to the strong consistency that each single Accumulo instance still holds. That +is to say, attempts to read data from a table on a peer which has pending replication +from the primary will not wait for that data to be replicated before running the scan. +This is desirable for a number of reasons, the most important is that the replication +framework is not limited by network outages or offline peers, but only by the HDFS +space available on the primary system. + +Replication configurations can be considered as a directed graph which allows cycles. +The systems in which data was replicated from is maintained in each Mutation which +allow each system to determine if a peer has already has the data in which +the system wants to send. + +Data is replicated by using the Write-Ahead logs (WAL) that each TabletServer is +already maintaining. TabletServers records which WALs have data that need to be +replicated to the +accumulo.metadata+ table. The Master uses these records, +combined with the local Accumulo table that the WAL was used with, to create records +in the +replication+ table which track which peers the given WAL should be +replicated to. The Master latter uses these work entries to assign the actual +replication task to a local TabletServer using ZooKeeper. A TabletServer will get +a lock in ZooKeeper for the replication of this file to a peer, and proceed to +replicate to the peer, recording progress in the +replication+ table as +data is successfully replicated on the peer. Later, the Master and Garbage Collector +will remove records from the +accumulo.metadata+ and +replication+ tables +and files from HDFS, respectively, after replication to all peers is complete. + +=== Configuration + +Configuration of Accumulo to replicate data to another system can be categorized +into the following sections. + +==== Site Configuration + +Each system involved in replication (even the primary) needs a name that uniquely +identifies it across all peers in the replication graph. This should be considered +fixed for an instance, and set in +accumulo-site.xml+. + +---- +<property> + <name>replication.name</name> + <value>primary</value> + <description>Unique name for this system used by replication</description> +</property> +---- + +==== Instance Configuration + +For each peer of this system, Accumulo needs to know the name of that peer, +the class used to replicate data to that system and some configuration information +to connect to this remote peer. In the case of Accumulo, this additional data +is the Accumulo instance name and ZooKeeper quorum; however, this varies on the +replication implementation for the peer. + +These can be set in the site configuration to ease deployments; however, as they may +change, it can be useful to set this information using the Accumulo shell. + +To configure a peer with the name +peer1+ which is an Accumulo system with an instance name of +accumulo_peer+ +and a ZooKeeper quorum of +10.0.0.1,10.0.2.1,10.0.3.1+, invoke the following +command in the shell. + +---- +root@accumulo_primary> config -s +replication.peer.peer1=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,accumulo_peer,10.0.0.1,10.0.2.1,10.0.3.1 +---- + +Since this is an Accumulo system, we also want to set a username and password +to use when authenticating with this peer. On our peer, we make a special user +which has permission to write to the tables we want to replicate data into, "replication" +with a password of "password". We then need to record this in the primary's configuration. + +---- +root@accumulo_primary> config -s replication.peer.user.peer1=replication +root@accumulo_primary> config -s replication.peer.password.peer1=password +---- + +==== Table Configuration + +Now, we presently have a peer defined, so we just need to configure which tables will +replicate to that peer. We also need to configure an identifier to determine where +this data will be replicated on the peer. Since we're replicating to another Accumulo +cluster, this is a table ID. In this example, we want to enable replication on ++my_table+ and configure our peer +accumulo_peer+ as a target, sending +the data to the table with an ID of +2+ in +accumulo_peer+. + +\begingroup\fontsize{8pt}{8pt}\selectfont\begin{verbatim} +root@accumulo_primary> config -t my_table -s table.replication=true +root@accumulo_primary> config -t my_table -s table.replication.target.acccumulo_peer=2 +\end{verbatim}\endgroup + +To replicate a single table on the primary to multiple peers, the second command +in the above shell snippet can be issued, for each peer and remote identifier pair. + +=== Monitoring + +Basic information about replication status from a primary can be found on the Accumulo +Monitor server, using the +Replication+ link the sidebar. + +On this page, information is broken down into the following sections: + +1. Files pending replication by peer and target +2. Files queued for replication, with progress made + +=== Work Assignment + +Depending on the schema of a table, different implementations of the WorkAssigner used could +be configured. The implementation is controlled via the property +replication.work.assigner+ +and the full class name for the implementation. This can be configured via the shell or ++accumulo-site.xml+. + +---- +<property> + <name>replication.work.assigner</name> + <value>org.apache.accumulo.master.replication.SequentialWorkAssigner</value> + <description>Implementation used to assign work for replication</description> +</property> +---- + +---- +root@accumulo_primary> config -t my_table -s replication.work.assigner=org.apache.accumulo.master.replication.SequentialWorkAssigner +---- + +Two implementations are provided. By default, the +SequentialWorkAssigner+ is configured for an +instance. The SequentialWorkAssigner ensures that, per peer and each remote identifier, each WAL is +replicated in the order in which they were created. This is sufficient to ensure that updates to a table +will be replayed in the correct order on the peer. This implementation has the downside of only replicating +a single WAL at a time. + +The second implementation, the +UnorderedWorkAssigner+ can be used to overcome the limitation +of only a single WAL being replicated to a target and peer at any time. Depending on the table schema, +it's possible that multiple versions of the same Key with different values are infrequent or nonexistent. +In this case, parallel replication to a peer and target is possible without any downsides. In the case +where this implementation is used were column updates are frequent, it is possible that there will be +an inconsistency between the primary and the peer. + +=== ReplicaSystems + ++ReplicaSystem+ is the interface which allows abstraction of replication of data +to peers of various types. Presently, only an +AccumuloReplicaSystem+ is provided +which will replicate data to another Accumulo instance. A +ReplicaSystem+ implementation +is run inside of the TabletServer process, and can be configured as mentioned in the ++Instance Configuration+ section of this document. Theoretically, an implementation +of this interface could send data to other filesystems, databases, etc. + +==== AccumuloReplicaSystem + +The +AccumuloReplicaSystem+ uses Thrift to communicate with a peer Accumulo instance +and replicate the necessary data. The TabletServer running on the primary will communicate +with the Master on the peer to request the address of a TabletServer on the peer which +this TabletServer will use to replicate the data. + +The TabletServer on the primary will then replicate data in batches of a configurable +size (+replication.max.unit.size+). The TabletServer on the peer will report how many +records were applied back to the primary, which will be used to record how many records +were successfully replicated. The TabletServer on the primary will continue to replicate +data in these batches until no more data can be read from the file. ++ ++=== Other Configuration ++ ++There are a number of configuration values that can be used to control how ++the implementation of various components operate. ++ ++[width="75%",cols=">,^2,^2"] ++[options="header"] ++|==== ++|Property | Description | Default ++|replication.max.work.queue | Maximum number of files queued for replication at one time | 1000 ++|replication.work.assignment.sleep | Time between invocations of the WorkAssigner | 30s ++|replication.worker.threads | Size of threadpool used to replicate data to peers | 4 ++|replication.receipt.service.port | Thrift service port to listen for replication requests, can use '0' for a random port | 10002 ++|replication.work.attempts | Number of attempts to replicate to a peer before aborting the attempt | 10 ++|replication.receiver.min.threads | Minimum number of idle threads for handling incoming replication | 1 ++|replication.receiver.threadcheck.time | Time between attempting adjustments of thread pool for incoming replications | 30s ++|replication.max.unit.size | Maximum amount of data to be replicated in one RPC | 64M ++|replication.work.assigner | Work Assigner implementation | org.apache.accumulo.master.replication.SequentialWorkAssigner ++|tserver.replication.batchwriter.replayer.memory| Size of BatchWriter cache to use in applying replication requests | 50M ++|==== http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index e4c7ef9,2a453a8..689557c --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -223,8 -211,17 +213,19 @@@ import org.apache.accumulo.tserver.metr import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics; +import org.apache.accumulo.tserver.replication.ReplicationServicerHandler; +import org.apache.accumulo.tserver.replication.ReplicationWorker; + import org.apache.accumulo.tserver.tablet.CommitSession; + import org.apache.accumulo.tserver.tablet.CompactionInfo; + import org.apache.accumulo.tserver.tablet.CompactionWatcher; + import org.apache.accumulo.tserver.tablet.Compactor; + import org.apache.accumulo.tserver.tablet.KVEntry; + import org.apache.accumulo.tserver.tablet.Tablet.LookupResult; + import org.apache.accumulo.tserver.tablet.ScanBatch; + import org.apache.accumulo.tserver.tablet.Scanner; + import org.apache.accumulo.tserver.tablet.SplitInfo; + import org.apache.accumulo.tserver.tablet.Tablet; + import org.apache.accumulo.tserver.tablet.TabletClosedException; import org.apache.commons.collections.map.LRUMap; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; @@@ -3121,30 -3099,7 +3122,30 @@@ public class TabletServer extends Abstr return address; } + private HostAndPort startReplicationService() throws UnknownHostException { + ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance())); + ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); + AccumuloConfiguration conf = getSystemConfiguration(); + Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, + "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + this.replServer = sp.server; + log.info("Started replication service on " + sp.address); + + try { + // The replication service is unique to the thrift service for a tserver, not just a host. + // Advertise the host and port for replication service given the host and port for the tserver. + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZREPLICATION_TSERVERS + "/" + clientAddress.toString(), + sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + log.error("Could not advertise replication service port", e); + throw new RuntimeException(e); + } + + return sp.address; + } + - ZooLock getLock() { + public ZooLock getLock() { return tabletServerLock; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index b7b0aff,9fec437..b4f14ec --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -41,9 -36,6 +41,8 @@@ import org.apache.accumulo.core.replica import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.util.ReplicationTableUtil; - import org.apache.accumulo.tserver.Tablet.CommitSession; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; http://git-wip-us.apache.org/repos/asf/accumulo/blob/e81eee7f/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 0000000,2771db9..5b46b7b mode 000000,100644..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@@ -1,0 -1,581 +1,605 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.tserver.tablet; + + import java.io.IOException; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; ++import java.util.Map.Entry; + import java.util.Set; + import java.util.SortedMap; + import java.util.TreeMap; + import java.util.TreeSet; -import java.util.Map.Entry; + + import org.apache.accumulo.core.client.Connector; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.KeyExtent; + import org.apache.accumulo.core.metadata.schema.DataFileValue; ++import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; ++import org.apache.accumulo.core.replication.StatusUtil; + import org.apache.accumulo.core.security.Credentials; + import org.apache.accumulo.core.util.MapCounter; + import org.apache.accumulo.core.util.Pair; + import org.apache.accumulo.core.util.UtilWaitThread; + import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; + import org.apache.accumulo.server.ServerConstants; + import org.apache.accumulo.server.client.HdfsZooInstance; + import org.apache.accumulo.server.fs.FileRef; + import org.apache.accumulo.server.fs.VolumeManager; + import org.apache.accumulo.server.master.state.TServerInstance; + import org.apache.accumulo.server.security.SystemCredentials; + import org.apache.accumulo.server.util.MasterMetadataUtil; + import org.apache.accumulo.server.util.MetadataTableUtil; ++import org.apache.accumulo.server.util.ReplicationTableUtil; + import org.apache.accumulo.server.zookeeper.ZooReaderWriter; + import org.apache.accumulo.trace.instrument.Span; + import org.apache.accumulo.trace.instrument.Trace; + import org.apache.accumulo.tserver.TLevel; + import org.apache.hadoop.fs.Path; + import org.apache.log4j.Logger; + + class DatafileManager { + private final Logger log = Logger.getLogger(DatafileManager.class); + // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles + private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>()); + private final Tablet tablet; + + // ensure we only have one reader/writer of our bulk file notes at at time + private final Object bulkFileImportLock = new Object(); + + DatafileManager(Tablet tablet, SortedMap<FileRef,DataFileValue> datafileSizes) { + for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet()) { + this.datafileSizes.put(datafiles.getKey(), datafiles.getValue()); + } + this.tablet = tablet; + } + + private FileRef mergingMinorCompactionFile = null; + private final Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>(); + private final Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>(); + private final MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>(); + private long nextScanReservationId = 0; + private boolean reservationsBlocked = false; + + private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>(); + + static void rename(VolumeManager fs, Path src, Path dst) throws IOException { + if (!fs.rename(src, dst)) { + throw new IOException("Rename " + src + " to " + dst + " returned false "); + } + } + + Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() { + synchronized (tablet) { + + while (reservationsBlocked) { + try { + tablet.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + + Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet()); + + long rid = nextScanReservationId++; + + scanFileReservations.put(rid, absFilePaths); + + Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>(); + + for (FileRef path : absFilePaths) { + fileScanReferenceCounts.increment(path, 1); + ret.put(path, datafileSizes.get(path)); + } + + return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret); + } + } + + void returnFilesForScan(Long reservationId) { + + final Set<FileRef> filesToDelete = new HashSet<FileRef>(); + + synchronized (tablet) { + Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId); + + if (absFilePaths == null) + throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); + + boolean notify = false; + for (FileRef path : absFilePaths) { + long refCount = fileScanReferenceCounts.decrement(path, 1); + if (refCount == 0) { + if (filesToDeleteAfterScan.remove(path)) + filesToDelete.add(path); + notify = true; + } else if (refCount < 0) + throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + + if (notify) + tablet.notifyAll(); + } + + if (filesToDelete.size() > 0) { + log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete); + MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock()); + } + } + + void removeFilesAfterScan(Set<FileRef> scanFiles) { + if (scanFiles.size() == 0) + return; + + Set<FileRef> filesToDelete = new HashSet<FileRef>(); + + synchronized (tablet) { + for (FileRef path : scanFiles) { + if (fileScanReferenceCounts.get(path) == 0) + filesToDelete.add(path); + else + filesToDeleteAfterScan.add(path); + } + } + + if (filesToDelete.size() > 0) { + log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete); + MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock()); + } + } + + private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { + long startTime = System.currentTimeMillis(); + TreeSet<FileRef> inUse = new TreeSet<FileRef>(); + + Span waitForScans = Trace.start("waitForScans"); + try { + synchronized (tablet) { + if (blockNewScans) { + if (reservationsBlocked) + throw new IllegalStateException(); + + reservationsBlocked = true; + } + + for (FileRef path : pathsToWaitFor) { + while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { + try { + tablet.wait(100); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + for (FileRef path : pathsToWaitFor) { + if (fileScanReferenceCounts.get(path) > 0) + inUse.add(path); + } + + if (blockNewScans) { + reservationsBlocked = false; + tablet.notifyAll(); + } + + } + } finally { + waitForScans.stop(); + } + return inUse; + } + + public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException { + + final KeyExtent extent = tablet.getExtent(); + String bulkDir = null; + + Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>(); + for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet()) + paths.put(entry.getKey(), entry.getValue()); + + for (FileRef tpath : paths.keySet()) { + + boolean inTheRightDirectory = false; + Path parent = tpath.path().getParent().getParent(); + for (String tablesDir : ServerConstants.getTablesDirs()) { + if (parent.equals(new Path(tablesDir, tablet.getExtent().getTableId().toString()))) { + inTheRightDirectory = true; + break; + } + } + if (!inTheRightDirectory) { + throw new IOException("Data file " + tpath + " not in table dirs"); + } + + if (bulkDir == null) + bulkDir = tpath.path().getParent().toString(); + else if (!bulkDir.equals(tpath.path().getParent().toString())) + throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath); + + } + + if (tablet.getExtent().isRootTablet()) { + throw new IllegalArgumentException("Can not import files to root tablet"); + } + + synchronized (bulkFileImportLock) { + Credentials creds = SystemCredentials.get(); + Connector conn; + try { + conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken()); + } catch (Exception ex) { + throw new IOException(ex); + } + // Remove any bulk files we've previously loaded and compacted away + List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid); + + for (FileRef file : files) + if (paths.keySet().remove(file)) + log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file); + + if (paths.size() > 0) { + long bulkTime = Long.MIN_VALUE; + if (setTime) { + for (DataFileValue dfv : paths.values()) { + long nextTime = tablet.getAndUpdateTime(); + if (nextTime < bulkTime) + throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime); + bulkTime = nextTime; + dfv.setTime(bulkTime); + } + } + + tablet.updatePersistedTime(bulkTime, paths, tid); + } + } + + synchronized (tablet) { + for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) { + if (datafileSizes.containsKey(tpath.getKey())) { + log.error("Adding file that is already in set " + tpath.getKey()); + } + datafileSizes.put(tpath.getKey(), tpath.getValue()); + + } + + tablet.getTabletResources().importedMapFiles(); + + tablet.computeNumEntries(); + } + + for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) { + log.log(TLevel.TABLET_HIST, tablet.getExtent() + " import " + entry.getKey() + " " + entry.getValue()); + } + } + + FileRef reserveMergingMinorCompactionFile() { + if (mergingMinorCompactionFile != null) + throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile); + + if (tablet.getExtent().isRootTablet()) + return null; + + int maxFiles = tablet.getTableConfiguration().getMaxFilesPerTablet(); + + // when a major compaction is running and we are at max files, write out + // one extra file... want to avoid the case where major compaction is + // compacting everything except for the largest file, and therefore the + // largest file is returned for merging.. the following check mostly + // avoids this case, except for the case where major compactions fail or + // are canceled + if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles) + return null; + + if (datafileSizes.size() >= maxFiles) { + // find the smallest file + + long min = Long.MAX_VALUE; + FileRef minName = null; + + for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) { + if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) { + min = entry.getValue().getSize(); + minName = entry.getKey(); + } + } + + if (minName == null) + return null; + + mergingMinorCompactionFile = minName; + return minName; + } + + return null; + } + + void unreserveMergingMinorCompactionFile(FileRef file) { + if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null) + || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile))) + throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile); + + mergingMinorCompactionFile = null; + } + + void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) + throws IOException { + + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + if (tablet.getExtent().isRootTablet()) { + try { + if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) { + throw new IllegalStateException(); + } + } catch (Exception e) { + throw new IllegalStateException("Can not bring major compaction online, lock not held", e); + } + } + + // rename before putting in metadata table, so files in metadata table should + // always exist + do { + try { + if (dfv.getNumEntries() == 0) { + tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.path()); + } else { + if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) { + log.warn("Target map file already exist " + newDatafile); + tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path()); + } + + rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path()); + } + break; + } catch (IOException ioe) { + log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe); + UtilWaitThread.sleep(60 * 1000); + } + } while (true); + + long t1, t2; + + // the code below always assumes merged files are in use by scans... this must be done + // because the in memory list of files is not updated until after the metadata table + // therefore the file is available to scans until memory is updated, but want to ensure + // the file is not available for garbage collection... if memory were updated + // before this point (like major compactions do), then the following code could wait + // for scans to finish like major compactions do.... used to wait for scans to finish + // here, but that was incorrect because a scan could start after waiting but before + // memory was updated... assuming the file is always in use by scans leads to + // one uneeded metadata update when it was not actually in use + Set<FileRef> filesInUseByScans = Collections.emptySet(); + if (absMergeFile != null) + filesInUseByScans = Collections.singleton(absMergeFile); + + // very important to write delete entries outside of log lock, because + // this metadata write does not go up... it goes sideways or to itself + if (absMergeFile != null) + MetadataTableUtil.addDeleteEntries(tablet.getExtent(), Collections.singleton(absMergeFile), SystemCredentials.get()); + + Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs(); ++ boolean replicate = ReplicationConfigurationUtil.isEnabled(tablet.getExtent(), tablet.getTableConfiguration()); ++ Set<String> logFileOnly = null; ++ if (replicate) { ++ // unusedWalLogs is of the form host/fileURI, need to strip off the host portion ++ logFileOnly = new HashSet<>(); ++ for (String unusedWalLog : unusedWalLogs) { ++ int index = unusedWalLog.indexOf('/'); ++ if (-1 == index) { ++ log.warn("Could not find host component to strip from DFSLogger representation of WAL"); ++ } else { ++ unusedWalLog = unusedWalLog.substring(index + 1); ++ } ++ logFileOnly.add(unusedWalLog); ++ } ++ } + try { + // the order of writing to metadata and walog is important in the face of machine/process failures + // need to write to metadata before writing to walog, when things are done in the reverse order + // data could be lost... the minor compaction start even should be written before the following metadata + // write is made + + tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, absMergeFile, dfv, unusedWalLogs, filesInUseByScans, flushId); + ++ // Mark that we have data we want to replicate ++ // This WAL could still be in use by other Tablets *from the same table*, so we can only mark that there is data to replicate, ++ // but it is *not* closed ++ if (replicate) { ++ ReplicationTableUtil.updateFiles(SystemCredentials.get(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength()); ++ } + } finally { + tablet.finishClearingUnusedLogs(); + } + + do { + try { + // the purpose of making this update use the new commit session, instead of the old one passed in, + // is because the new one will reference the logs used by current memory... + + tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2); + break; + } catch (IOException e) { + log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e); + UtilWaitThread.sleep(1 * 1000); + } + } while (true); + + synchronized (tablet) { + t1 = System.currentTimeMillis(); + + if (datafileSizes.containsKey(newDatafile)) { + log.error("Adding file that is already in set " + newDatafile); + } + + if (dfv.getNumEntries() > 0) { + datafileSizes.put(newDatafile, dfv); + } + + if (absMergeFile != null) { + datafileSizes.remove(absMergeFile); + } + + unreserveMergingMinorCompactionFile(absMergeFile); + + tablet.flushComplete(flushId); + + t2 = System.currentTimeMillis(); + } + + // must do this after list of files in memory is updated above + removeFilesAfterScan(filesInUseByScans); + + if (absMergeFile != null) + log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [" + absMergeFile + ",memory] -> " + newDatafile); + else + log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [memory] -> " + newDatafile); + log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, tablet.getExtent().toString())); + long splitSize = tablet.getTableConfiguration().getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD); + if (dfv.getSize() > splitSize) { + log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", splitSize, dfv.getSize())); + } + } + + public void reserveMajorCompactingFiles(Collection<FileRef> files) { + if (majorCompactingFiles.size() != 0) + throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles); + + if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile)) + throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile); + + majorCompactingFiles.addAll(files); + } + + public void clearMajorCompactingFile() { + majorCompactingFiles.clear(); + } + + void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv) + throws IOException { + final KeyExtent extent = tablet.getExtent(); + long t1, t2; + + if (!extent.isRootTablet()) { + + if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) { + log.error("Target map file already exist " + newDatafile, new Exception()); + throw new IllegalStateException("Target map file already exist " + newDatafile); + } + + // rename before putting in metadata table, so files in metadata table should + // always exist + rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path()); + + if (dfv.getNumEntries() == 0) { + tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path()); + } + } + + TServerInstance lastLocation = null; + synchronized (tablet) { + + t1 = System.currentTimeMillis(); + + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + + tablet.incrementDataSourceDeletions(); + + if (extent.isRootTablet()) { + + waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE); + + try { + if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) { + throw new IllegalStateException(); + } + } catch (Exception e) { + throw new IllegalStateException("Can not bring major compaction online, lock not held", e); + } + + // mark files as ready for deletion, but + // do not delete them until we successfully + // rename the compacted map file, in case + // the system goes down + + RootFiles.replaceFiles(tablet.getTableConfiguration(), tablet.getTabletServer().getFileSystem(), tablet.getLocation(), oldDatafiles, tmpDatafile, newDatafile); + } + + // atomically remove old files and add new file + for (FileRef oldDatafile : oldDatafiles) { + if (!datafileSizes.containsKey(oldDatafile)) { + log.error("file does not exist in set " + oldDatafile); + } + datafileSizes.remove(oldDatafile); + majorCompactingFiles.remove(oldDatafile); + } + + if (datafileSizes.containsKey(newDatafile)) { + log.error("Adding file that is already in set " + newDatafile); + } + + if (dfv.getNumEntries() > 0) { + datafileSizes.put(newDatafile, dfv); + } + + // could be used by a follow on compaction in a multipass compaction + majorCompactingFiles.add(newDatafile); + + tablet.computeNumEntries(); + + lastLocation = tablet.resetLastLocation(); + + tablet.setLastCompactionID(compactionId); + t2 = System.currentTimeMillis(); + } + + if (!extent.isRootTablet()) { + Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000); + if (filesInUseByScans.size() > 0) + log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans); + MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(), + tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock()); + removeFilesAfterScan(filesInUseByScans); + } + + log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0)); + log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile); + } + + public SortedMap<FileRef,DataFileValue> getDatafileSizes() { + synchronized (tablet) { + TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes); + return Collections.unmodifiableSortedMap(copy); + } + } + + public Set<FileRef> getFiles() { + synchronized (tablet) { + HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet()); + return Collections.unmodifiableSet(files); + } + } + + public int getNumFiles() { + return datafileSizes.size(); + } + + }