Merge branch '1.5' into 1.6 Conflicts: server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java server/master/src/main/java/org/apache/accumulo/master/state/SetGoalState.java server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java server/src/main/java/org/apache/accumulo/server/master/Master.java server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2e5064e3 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2e5064e3 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2e5064e3 Branch: refs/heads/master Commit: 2e5064e36aefa53073b42185c0ce30efba1924fb Parents: 9255120 3bcc3c9 Author: Josh Elser <els...@apache.org> Authored: Tue Dec 23 15:00:31 2014 -0500 Committer: Josh Elser <els...@apache.org> Committed: Tue Dec 23 15:00:31 2014 -0500 ---------------------------------------------------------------------- .../src/main/java/org/apache/accumulo/server/init/Initialize.java | 2 +- .../java/org/apache/accumulo/server/security/SecurityUtil.java | 2 +- .../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +- server/master/src/main/java/org/apache/accumulo/master/Master.java | 2 +- .../main/java/org/apache/accumulo/master/state/SetGoalState.java | 2 +- .../monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java | 2 +- .../src/main/java/org/apache/accumulo/tracer/TraceServer.java | 2 +- .../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 2 +- .../java/org/apache/accumulo/tserver/log/LocalWALRecovery.java | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 7c4b8d3,0000000..48cf1bf mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@@ -1,587 -1,0 +1,587 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.init; + +import static com.google.common.base.Charsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.Map.Entry; +import java.util.UUID; + +import jline.console.ConsoleReader; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.iterators.user.VersioningIterator; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; - import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.volume.VolumeConfiguration; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.constraints.MetadataConstraints; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.tablets.TabletTime; +import org.apache.accumulo.server.util.TablePropUtil; ++import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; + +import com.beust.jcommander.Parameter; + +/** + * This class is used to setup the directory structure and the root tablet to get an instance started + * + */ +public class Initialize { + private static final Logger log = Logger.getLogger(Initialize.class); + private static final String DEFAULT_ROOT_USER = "root"; + public static final String TABLE_TABLETS_TABLET_DIR = "/table_info"; + + private static ConsoleReader reader = null; + private static IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + private static ConsoleReader getConsoleReader() throws IOException { + if (reader == null) + reader = new ConsoleReader(); + return reader; + } + + /** + * Sets this class's ZooKeeper reader/writer. + * + * @param izoo + * reader/writer + */ + static void setZooReaderWriter(IZooReaderWriter izoo) { + zoo = izoo; + } + + /** + * Gets this class's ZooKeeper reader/writer. + * + * @return reader/writer + */ + static IZooReaderWriter getZooReaderWriter() { + return zoo; + } + + private static HashMap<String,String> initialMetadataConf = new HashMap<String,String>(); + static { + initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K"); + initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5"); + initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true"); + initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1"); + initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M"); + initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", "10," + VersioningIterator.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", "10," + VersioningIterator.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName()); + initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false"); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet", + String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME)); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", TabletsSection.DataFileColumnFamily.NAME, + TabletsSection.LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME)); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server"); + initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), ""); + initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true"); + initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); + } + + static boolean checkInit(Configuration conf, VolumeManager fs, SiteConfiguration sconf) throws IOException { + @SuppressWarnings("deprecation") + String fsUri = sconf.get(Property.INSTANCE_DFS_URI); + if (fsUri.equals("")) + fsUri = FileSystem.getDefaultUri(conf).toString(); + log.info("Hadoop Filesystem is " + fsUri); + log.info("Accumulo data dirs are " + Arrays.asList(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()))); + log.info("Zookeeper server is " + sconf.get(Property.INSTANCE_ZK_HOST)); + log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running"); + if (!zookeeperAvailable()) { + log.fatal("Zookeeper needs to be up and running in order to init. Exiting ..."); + return false; + } + if (sconf.get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue())) { + ConsoleReader c = getConsoleReader(); + c.beep(); + c.println(); + c.println(); + c.println("Warning!!! Your instance secret is still set to the default, this is not secure. We highly recommend you change it."); + c.println(); + c.println(); + c.println("You can change the instance secret in accumulo by using:"); + c.println(" bin/accumulo " + org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword newPassword."); + c.println("You will also need to edit your secret in your configuration file by adding the property instance.secret to your conf/accumulo-site.xml. Without this accumulo will not operate correctly"); + } + try { + if (isInitialized(fs)) { + printInitializeFailureMessages(sconf); + return false; + } + } catch (IOException e) { + throw new IOException("Failed to check if filesystem already initialized", e); + } + + return true; + } + + static void printInitializeFailureMessages(SiteConfiguration sconf) { + @SuppressWarnings("deprecation") + Property INSTANCE_DFS_DIR = Property.INSTANCE_DFS_DIR; + @SuppressWarnings("deprecation") + Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI; + String instanceDfsDir = sconf.get(INSTANCE_DFS_DIR); + log.fatal("It appears the directories " + Arrays.asList(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration())) + + " were previously initialized."); + String instanceVolumes = sconf.get(Property.INSTANCE_VOLUMES); + String instanceDfsUri = sconf.get(INSTANCE_DFS_URI); + + if (!instanceVolumes.isEmpty()) { + log.fatal("Change the property " + Property.INSTANCE_VOLUMES + " to use different filesystems,"); + } else if (!instanceDfsDir.isEmpty()) { + log.fatal("Change the property " + INSTANCE_DFS_URI + " to use a different filesystem,"); + } else { + log.fatal("You are using the default URI for the filesystem. Set the property " + Property.INSTANCE_VOLUMES + " to use a different filesystem,"); + } + log.fatal("or change the property " + INSTANCE_DFS_DIR + " to use a different directory."); + log.fatal("The current value of " + INSTANCE_DFS_URI + " is |" + instanceDfsUri + "|"); + log.fatal("The current value of " + INSTANCE_DFS_DIR + " is |" + instanceDfsDir + "|"); + log.fatal("The current value of " + Property.INSTANCE_VOLUMES + " is |" + instanceVolumes + "|"); + } + + public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException { + if (!checkInit(conf, fs, ServerConfiguration.getSiteConfiguration())) { + return false; + } + + // prompt user for instance name and root password early, in case they + // abort, we don't leave an inconsistent HDFS/ZooKeeper structure + String instanceNamePath; + try { + instanceNamePath = getInstanceNamePath(opts); + } catch (Exception e) { + log.fatal("Failed to talk to zookeeper", e); + return false; + } + opts.rootpass = getRootPassword(opts); + return initialize(opts, instanceNamePath, fs); + } + + public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) { + + UUID uuid = UUID.randomUUID(); + // the actual disk locations of the root table and tablets + String[] configuredVolumes = VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()); + final String rootTabletDir = new Path(fs.choose(configuredVolumes) + Path.SEPARATOR + ServerConstants.TABLE_DIR + Path.SEPARATOR + RootTable.ID + + RootTable.ROOT_TABLET_LOCATION).toString(); + + try { + initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTabletDir); + } catch (Exception e) { + log.fatal("Failed to initialize zookeeper", e); + return false; + } + + try { + initFileSystem(opts, fs, uuid, rootTabletDir); + } catch (Exception e) { + log.fatal("Failed to initialize filesystem", e); + + if (ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES).trim().equals("")) { + Configuration fsConf = CachedConfiguration.getInstance(); + + final String defaultFsUri = "file:///"; + String fsDefaultName = fsConf.get("fs.default.name", defaultFsUri), fsDefaultFS = fsConf.get("fs.defaultFS", defaultFsUri); + + // Try to determine when we couldn't find an appropriate core-site.xml on the classpath + if (defaultFsUri.equals(fsDefaultName) && defaultFsUri.equals(fsDefaultFS)) { + log.fatal("Default filesystem value ('fs.defaultFS' or 'fs.default.name') of '" + defaultFsUri + "' was found in the Hadoop configuration"); + log.fatal("Please ensure that the Hadoop core-site.xml is on the classpath using 'general.classpaths' in accumulo-site.xml"); + } + } + + return false; + } + + try { + initSecurity(opts, uuid.toString()); + } catch (Exception e) { + log.fatal("Failed to initialize security", e); + return false; + } + return true; + } + + private static boolean zookeeperAvailable() { + try { + return zoo.exists("/"); + } catch (KeeperException e) { + return false; + } catch (InterruptedException e) { + return false; + } + } + + private static void initDirs(VolumeManager fs, UUID uuid, String[] baseDirs, boolean print) throws IOException { + for (String baseDir : baseDirs) { + fs.mkdirs(new Path(new Path(baseDir, ServerConstants.VERSION_DIR), "" + ServerConstants.DATA_VERSION)); + + // create an instance id + Path iidLocation = new Path(baseDir, ServerConstants.INSTANCE_ID_DIR); + fs.mkdirs(iidLocation); + fs.createNewFile(new Path(iidLocation, uuid.toString())); + if (print) + log.info("Initialized volume " + baseDir); + } + } + + private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, String rootTabletDir) throws IOException { + initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()), false); + + // initialize initial metadata config in zookeeper + initMetadataConfig(); + + String tableMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID + + TABLE_TABLETS_TABLET_DIR; + String defaultMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID + + Constants.DEFAULT_TABLET_LOCATION; + + // create table and default tablets directories + createDirectories(fs, rootTabletDir, tableMetadataTabletDir, defaultMetadataTabletDir); + + // populate the root tablet with info about the metadata tablets + String fileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration()); + createMetadataFile(fs, fileName, MetadataTable.ID, tableMetadataTabletDir, defaultMetadataTabletDir); + } + + /** + * Create an rfile in the default tablet's directory for a new table. This method is used to create the initial root tablet contents, with information about + * the metadata table's tablets + * + * @param volmanager + * The VolumeManager + * @param fileName + * The location to create the file + * @param tableId + * TableID that is being "created" + * @param tableTabletDir + * The table_info directory for the new table + * @param defaultTabletDir + * The default_tablet directory for the new table + */ + private static void createMetadataFile(VolumeManager volmanager, String fileName, String tableId, String tableTabletDir, String defaultTabletDir) + throws IOException { + FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem(); + FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), AccumuloConfiguration.getDefaultConfiguration()); + tabletWriter.startDefaultLocalityGroup(); + + Text splitPoint = TabletsSection.getRange().getEndKey().getRow(); + createEntriesForTablet(tabletWriter, tableId, tableTabletDir, null, splitPoint); + createEntriesForTablet(tabletWriter, tableId, defaultTabletDir, splitPoint, null); + + tabletWriter.close(); + } + + private static void createEntriesForTablet(FileSKVWriter writer, String tableId, String tabletDir, Text tabletPrevEndRow, Text tabletEndRow) + throws IOException { + Text extent = new Text(KeyExtent.getMetadataEntry(new Text(tableId), tabletEndRow)); + addEntry(writer, extent, DIRECTORY_COLUMN, new Value(tabletDir.getBytes(UTF_8))); + addEntry(writer, extent, TIME_COLUMN, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8))); + addEntry(writer, extent, PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(tabletPrevEndRow)); + } + + private static void addEntry(FileSKVWriter writer, Text row, ColumnFQ col, Value value) throws IOException { + writer.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier(), 0), value); + } + + private static void createDirectories(VolumeManager fs, String... dirs) throws IOException { + for (String s : dirs) { + Path dir = new Path(s); + try { + FileStatus fstat = fs.getFileStatus(dir); + // TODO Remove deprecation warning suppression when Hadoop1 support is dropped + @SuppressWarnings("deprecation") + boolean isDirectory = fstat.isDir(); + if (!isDirectory) { + log.fatal("location " + dir + " exists but is not a directory"); + return; + } + } catch (FileNotFoundException fnfe) { + // attempt to create directory, since it doesn't exist + if (!fs.mkdirs(dir)) { + log.fatal("unable to create directory " + dir); + return; + } + } + } + } + + private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath, String rootTabletDir) throws KeeperException, InterruptedException { + // setup basic data in zookeeper + zoo.putPersistentData(Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE); + zoo.putPersistentData(Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE); + + // setup instance name + if (opts.clearInstanceName) + zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP); + zoo.putPersistentData(instanceNamePath, uuid.getBytes(UTF_8), NodeExistsPolicy.FAIL); + + final byte[] EMPTY_BYTE_ARRAY = new byte[0], ZERO_CHAR_ARRAY = new byte[] {'0'}; + + // setup the instance + String zkInstanceRoot = Constants.ZROOT + "/" + uuid; + zoo.putPersistentData(zkInstanceRoot, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZNAMESPACES, new byte[0], NodeExistsPolicy.FAIL); + TableManager.prepareNewNamespaceState(uuid, Namespaces.DEFAULT_NAMESPACE_ID, Namespaces.DEFAULT_NAMESPACE, NodeExistsPolicy.FAIL); + TableManager.prepareNewNamespaceState(uuid, Namespaces.ACCUMULO_NAMESPACE_ID, Namespaces.ACCUMULO_NAMESPACE, NodeExistsPolicy.FAIL); + TableManager.prepareNewTableState(uuid, RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL); + TableManager.prepareNewTableState(uuid, MetadataTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(UTF_8), NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + } + + private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException { + // setup the instance name + String instanceName, instanceNamePath = null; + boolean exists = true; + do { + if (opts.cliInstanceName == null) { + instanceName = getConsoleReader().readLine("Instance name : "); + } else { + instanceName = opts.cliInstanceName; + } + if (instanceName == null) + System.exit(0); + instanceName = instanceName.trim(); + if (instanceName.length() == 0) + continue; + instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + if (opts.clearInstanceName) { + exists = false; + break; + } else if (exists = zoo.exists(instanceNamePath)) { + String decision = getConsoleReader().readLine("Instance name \"" + instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : "); + if (decision == null) + System.exit(0); + if (decision.length() == 1 && decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') { + opts.clearInstanceName = true; + exists = false; + } + } + } while (exists); + return instanceNamePath; + } + + private static byte[] getRootPassword(Opts opts) throws IOException { + if (opts.cliPassword != null) { + return opts.cliPassword.getBytes(UTF_8); + } + String rootpass; + String confirmpass; + do { + rootpass = getConsoleReader() + .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*'); + if (rootpass == null) + System.exit(0); + confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*'); + if (confirmpass == null) + System.exit(0); + if (!rootpass.equals(confirmpass)) + log.error("Passwords do not match"); + } while (!rootpass.equals(confirmpass)); + return rootpass.getBytes(UTF_8); + } + + private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException { + AuditedSecurityOperation.getInstance(iid, true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()), DEFAULT_ROOT_USER, + opts.rootpass); + } + + public static void initMetadataConfig() throws IOException { + try { + Configuration conf = CachedConfiguration.getInstance(); + int max = conf.getInt("dfs.replication.max", 512); + // Hadoop 0.23 switched the min value configuration name + int min = Math.max(conf.getInt("dfs.replication.min", 1), conf.getInt("dfs.namenode.replication.min", 1)); + if (max < 5) + setMetadataReplication(max, "max"); + if (min > 5) + setMetadataReplication(min, "min"); + for (Entry<String,String> entry : initialMetadataConf.entrySet()) { + if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), entry.getValue())) + throw new IOException("Cannot create per-table property " + entry.getKey()); + if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue())) + throw new IOException("Cannot create per-table property " + entry.getKey()); + } + } catch (Exception e) { + log.fatal("error talking to zookeeper", e); + throw new IOException(e); + } + } + + private static void setMetadataReplication(int replication, String reason) throws IOException { + String rep = getConsoleReader().readLine( + "Your HDFS replication " + reason + " is not compatible with our default " + MetadataTable.NAME + " replication of 5. What do you want to set your " + + MetadataTable.NAME + " replication to? (" + replication + ") "); + if (rep == null || rep.length() == 0) + rep = Integer.toString(replication); + else + // Lets make sure it's a number + Integer.parseInt(rep); + initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep); + } + + public static boolean isInitialized(VolumeManager fs) throws IOException { + for (String baseDir : VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration())) { + if (fs.exists(new Path(baseDir, ServerConstants.INSTANCE_ID_DIR)) || fs.exists(new Path(baseDir, ServerConstants.VERSION_DIR))) + return true; + } + + return false; + } + + private static void addVolumes(VolumeManager fs) throws IOException { + HashSet<String> initializedDirs = new HashSet<String>(); + initializedDirs.addAll(Arrays.asList(ServerConstants.checkBaseUris(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()), true))); + + HashSet<String> uinitializedDirs = new HashSet<String>(); + uinitializedDirs.addAll(Arrays.asList(VolumeConfiguration.getVolumeUris(ServerConfiguration.getSiteConfiguration()))); + uinitializedDirs.removeAll(initializedDirs); + + Path aBasePath = new Path(initializedDirs.iterator().next()); + Path iidPath = new Path(aBasePath, ServerConstants.INSTANCE_ID_DIR); + Path versionPath = new Path(aBasePath, ServerConstants.VERSION_DIR); + + UUID uuid = UUID.fromString(ZooUtil.getInstanceIDFromHdfs(iidPath, ServerConfiguration.getSiteConfiguration())); + + if (ServerConstants.DATA_VERSION != Accumulo.getAccumuloPersistentVersion(versionPath.getFileSystem(CachedConfiguration.getInstance()), versionPath)) { + throw new IOException("Accumulo " + Constants.VERSION + " cannot initialize data version " + Accumulo.getAccumuloPersistentVersion(fs)); + } + + initDirs(fs, uuid, uinitializedDirs.toArray(new String[uinitializedDirs.size()]), true); + } + + static class Opts extends Help { + @Parameter(names = "--add-volumes", description = "Initialize any uninitialized volumes listed in instance.volumes") + boolean addVolumes = false; + @Parameter(names = "--reset-security", description = "just update the security information") + boolean resetSecurity = false; + @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting") + boolean clearInstanceName = false; + @Parameter(names = "--instance-name", description = "the instance name, if not provided, will prompt") + String cliInstanceName; + @Parameter(names = "--password", description = "set the password on the command line") + String cliPassword; + + byte[] rootpass = null; + } + + public static void main(String[] args) { + Opts opts = new Opts(); + opts.parseArgs(Initialize.class.getName(), args); + + try { + AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration(); + SecurityUtil.serverLogin(acuConf); + Configuration conf = CachedConfiguration.getInstance(); + + VolumeManager fs = VolumeManagerImpl.get(acuConf); + + if (opts.resetSecurity) { + if (isInitialized(fs)) { + opts.rootpass = getRootPassword(opts); + initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID()); + } else { + log.fatal("Attempted to reset security on accumulo before it was initialized"); + } + } + + if (opts.addVolumes) { + addVolumes(fs); + } + + if (!opts.resetSecurity && !opts.addVolumes) + if (!doInit(opts, conf, fs)) + System.exit(-1); + } catch (Exception e) { + log.fatal(e, e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java index 684efc3,0000000..29e4939 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java @@@ -1,83 -1,0 +1,83 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ - package org.apache.accumulo.core.security; ++package org.apache.accumulo.server.security; + +import java.io.IOException; +import java.net.InetAddress; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Logger; + +/** + * + */ +public class SecurityUtil { + private static final Logger log = Logger.getLogger(SecurityUtil.class); + public static boolean usingKerberos = false; + + /** + * This method is for logging a server in kerberos. If this is used in client code, it will fail unless run as the accumulo keytab's owner. Instead, use + * {@link #login(String, String)} + */ + public static void serverLogin(AccumuloConfiguration acuConf) { + String keyTab = acuConf.getPath(Property.GENERAL_KERBEROS_KEYTAB); + if (keyTab == null || keyTab.length() == 0) + return; + + usingKerberos = true; + + String principalConfig = acuConf.get(Property.GENERAL_KERBEROS_PRINCIPAL); + if (principalConfig == null || principalConfig.length() == 0) + return; + + if (login(principalConfig, keyTab)) { + try { + // This spawns a thread to periodically renew the logged in (accumulo) user + UserGroupInformation.getLoginUser(); + return; + } catch (IOException io) { + log.error("Error starting up renewal thread. This shouldn't be happenining.", io); + } + } + + throw new RuntimeException("Failed to perform Kerberos login for " + principalConfig + " using " + keyTab); + } + + /** + * This will log in the given user in kerberos. + * + * @param principalConfig + * This is the principals name in the format NAME/HOST@REALM. {@link org.apache.hadoop.security.SecurityUtil#HOSTNAME_PATTERN} will automatically be + * replaced by the systems host name. + * @return true if login succeeded, otherwise false + */ + public static boolean login(String principalConfig, String keyTabPath) { + try { + String principalName = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(principalConfig, InetAddress.getLocalHost().getCanonicalHostName()); + if (keyTabPath != null && principalName != null && keyTabPath.length() != 0 && principalName.length() != 0) { + UserGroupInformation.loginUserFromKeytab(principalName, keyTabPath); + log.info("Succesfully logged in as user " + principalConfig); + return true; + } + } catch (IOException io) { + log.error("Error logging in user " + principalConfig + " using keytab at " + keyTabPath, io); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 4d3721e,0000000..8761480 mode 100644,000000..100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -1,763 -1,0 +1,763 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.gc; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; +import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor; +import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; - import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.SslConnectionParams; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.fs.VolumeUtil; ++import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.RpcWrapper; +import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.util.TabletIterator; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.trace.instrument.CountSampler; +import org.apache.accumulo.trace.instrument.Sampler; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.thrift.TInfo; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; + +import com.beust.jcommander.Parameter; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.net.HostAndPort; + +public class SimpleGarbageCollector implements Iface { + private static final Text EMPTY_TEXT = new Text(); + + /** + * Options for the garbage collector. + */ + static class Opts extends ServerOpts { + @Parameter(names = {"-v", "--verbose"}, description = "extra information will get printed to stdout also") + boolean verbose = false; + @Parameter(names = {"-s", "--safemode"}, description = "safe mode will not delete files") + boolean safeMode = false; + } + + /** + * A fraction representing how much of the JVM's available memory should be used for gathering candidates. + */ + static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f; + + private static final Logger log = Logger.getLogger(SimpleGarbageCollector.class); + + private Credentials credentials; + private VolumeManager fs; + private AccumuloConfiguration config; + private Opts opts = new Opts(); + private ZooLock lock; + + private GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); + + private Instance instance; + + public static void main(String[] args) throws UnknownHostException, IOException { + SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + final String app = "gc"; + Accumulo.setupLogging(app); + Instance instance = HdfsZooInstance.getInstance(); + ServerConfiguration conf = new ServerConfiguration(instance); + final VolumeManager fs = VolumeManagerImpl.get(); + Accumulo.init(fs, conf, app); + Opts opts = new Opts(); + opts.parseArgs(app, args); + SimpleGarbageCollector gc = new SimpleGarbageCollector(opts); + AccumuloConfiguration config = conf.getConfiguration(); + + gc.init(fs, instance, SystemCredentials.get(), config); + Accumulo.enableTracing(opts.getAddress(), app); + gc.run(); + } + + /** + * Creates a new garbage collector. + * + * @param opts + * options + */ + public SimpleGarbageCollector(Opts opts) { + this.opts = opts; + } + + /** + * Gets the credentials used by this GC. + * + * @return credentials + */ + Credentials getCredentials() { + return credentials; + } + + /** + * Gets the delay before the first collection. + * + * @return start delay, in milliseconds + */ + long getStartDelay() { + return config.getTimeInMillis(Property.GC_CYCLE_START); + } + + /** + * Gets the volume manager used by this GC. + * + * @return volume manager + */ + VolumeManager getVolumeManager() { + return fs; + } + + /** + * Checks if the volume manager should move files to the trash rather than delete them. + * + * @return true if trash is used + */ + boolean isUsingTrash() { + return !config.getBoolean(Property.GC_TRASH_IGNORE); + } + + /** + * Gets the options for this garbage collector. + */ + Opts getOpts() { + return opts; + } + + /** + * Gets the number of threads used for deleting files. + * + * @return number of delete threads + */ + int getNumDeleteThreads() { + return config.getCount(Property.GC_DELETE_THREADS); + } + + /** + * Gets the instance used by this GC. + * + * @return instance + */ + Instance getInstance() { + return instance; + } + + /** + * Should files be archived (as opposed to preserved in trash) + * + * @return True if files should be archived, false otherwise + */ + boolean shouldArchiveFiles() { + return config.getBoolean(Property.GC_FILE_ARCHIVE); + } + + /** + * Initializes this garbage collector. + * + * @param fs + * volume manager + * @param instance + * instance + * @param credentials + * credentials + * @param config + * system configuration + */ + public void init(VolumeManager fs, Instance instance, Credentials credentials, AccumuloConfiguration config) { + this.fs = fs; + this.credentials = credentials; + this.instance = instance; + this.config = config; + long gcDelay = config.getTimeInMillis(Property.GC_CYCLE_DELAY); + log.info("start delay: " + getStartDelay() + " milliseconds"); + log.info("time delay: " + gcDelay + " milliseconds"); + log.info("safemode: " + opts.safeMode); + log.info("verbose: " + opts.verbose); + log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes"); + log.info("delete threads: " + getNumDeleteThreads()); + } + + private class GCEnv implements GarbageCollectionEnvironment { + + private String tableName; + + GCEnv(String tableName) { + this.tableName = tableName; + } + + @Override + public List<String> getCandidates(String continuePoint) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + // want to ensure GC makes progress... if the 1st N deletes are stable and we keep processing them, + // then will never inspect deletes after N + Range range = MetadataSchema.DeletesSection.getRange(); + if (continuePoint != null && !continuePoint.isEmpty()) { + String continueRow = MetadataSchema.DeletesSection.getRowPrefix() + continuePoint; + range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()); + } + + Scanner scanner = instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(range); + List<String> result = new ArrayList<String>(); + // find candidates for deletion; chop off the prefix + for (Entry<Key,Value> entry : scanner) { + String cand = entry.getKey().getRow().toString().substring(MetadataSchema.DeletesSection.getRowPrefix().length()); + result.add(cand); + if (almostOutOfMemory(Runtime.getRuntime())) { + log.info("List of delete candidates has exceeded the memory threshold. Attempting to delete what has been gathered so far."); + break; + } + } + + return result; + + } + + @Override + public Iterator<String> getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + IsolatedScanner scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, + Authorizations.EMPTY)); + + scanner.setRange(MetadataSchema.BlipSection.getRange()); + + return Iterators.transform(scanner.iterator(), new Function<Entry<Key,Value>,String>() { + @Override + public String apply(Entry<Key,Value> entry) { + return entry.getKey().getRow().toString().substring(MetadataSchema.BlipSection.getRowPrefix().length()); + } + }); + } + + @Override + public Iterator<Entry<Key,Value>> getReferenceIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + IsolatedScanner scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), credentials.getToken()).createScanner(tableName, + Authorizations.EMPTY)); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(ScanFileColumnFamily.NAME); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + TabletIterator tabletIterator = new TabletIterator(scanner, MetadataSchema.TabletsSection.getRange(), false, true); + + return Iterators.concat(Iterators.transform(tabletIterator, new Function<Map<Key,Value>,Iterator<Entry<Key,Value>>>() { + @Override + public Iterator<Entry<Key,Value>> apply(Map<Key,Value> input) { + return input.entrySet().iterator(); + } + })); + } + + @Override + public Set<String> getTableIDs() { + return Tables.getIdToNameMap(instance).keySet(); + } + + @Override + public void delete(SortedMap<String,String> confirmedDeletes) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + + if (opts.safeMode) { + if (opts.verbose) + System.out.println("SAFEMODE: There are " + confirmedDeletes.size() + " data file candidates marked for deletion.%n" + + " Examine the log files to identify them.%n"); + log.info("SAFEMODE: Listing all data file candidates for deletion"); + for (String s : confirmedDeletes.values()) + log.info("SAFEMODE: " + s); + log.info("SAFEMODE: End candidates for deletion"); + return; + } + + Connector c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()); + BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig()); + + // when deleting a dir and all files in that dir, only need to delete the dir + // the dir will sort right before the files... so remove the files in this case + // to minimize namenode ops + Iterator<Entry<String,String>> cdIter = confirmedDeletes.entrySet().iterator(); + + String lastDir = null; + while (cdIter.hasNext()) { + Entry<String,String> entry = cdIter.next(); + String relPath = entry.getKey(); + String absPath = fs.getFullPath(FileType.TABLE, entry.getValue()).toString(); + + if (isDir(relPath)) { + lastDir = absPath; + } else if (lastDir != null) { + if (absPath.startsWith(lastDir)) { + log.debug("Ignoring " + entry.getValue() + " because " + lastDir + " exist"); + try { + putMarkerDeleteMutation(entry.getValue(), writer); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + cdIter.remove(); + } else { + lastDir = null; + } + } + } + + final BatchWriter finalWriter = writer; + + ExecutorService deleteThreadPool = Executors.newFixedThreadPool(getNumDeleteThreads(), new NamingThreadFactory("deleting")); + + final List<Pair<Path,Path>> replacements = ServerConstants.getVolumeReplacements(); + + for (final String delete : confirmedDeletes.values()) { + + Runnable deleteTask = new Runnable() { + @Override + public void run() { + boolean removeFlag; + + try { + Path fullPath; + String switchedDelete = VolumeUtil.switchVolume(delete, FileType.TABLE, replacements); + if (switchedDelete != null) { + // actually replacing the volumes in the metadata table would be tricky because the entries would be different rows. So it could not be + // atomically in one mutation and extreme care would need to be taken that delete entry was not lost. Instead of doing that, just deal with + // volume switching when something needs to be deleted. Since the rest of the code uses suffixes to compare delete entries, there is no danger + // of deleting something that should not be deleted. Must not change value of delete variable because thats whats stored in metadata table. + log.debug("Volume replaced " + delete + " -> " + switchedDelete); + fullPath = fs.getFullPath(FileType.TABLE, switchedDelete); + } else { + fullPath = fs.getFullPath(FileType.TABLE, delete); + } + + log.debug("Deleting " + fullPath); + + if (archiveOrMoveToTrash(fullPath) || fs.deleteRecursively(fullPath)) { + // delete succeeded, still want to delete + removeFlag = true; + synchronized (SimpleGarbageCollector.this) { + ++status.current.deleted; + } + } else if (fs.exists(fullPath)) { + // leave the entry in the metadata; we'll try again later + removeFlag = false; + synchronized (SimpleGarbageCollector.this) { + ++status.current.errors; + } + log.warn("File exists, but was not deleted for an unknown reason: " + fullPath); + } else { + // this failure, we still want to remove the metadata entry + removeFlag = true; + synchronized (SimpleGarbageCollector.this) { + ++status.current.errors; + } + String parts[] = fullPath.toString().split(Constants.ZTABLES)[1].split("/"); + if (parts.length > 2) { + String tableId = parts[1]; + String tabletDir = parts[2]; + TableManager.getInstance().updateTableStateCache(tableId); + TableState tableState = TableManager.getInstance().getTableState(tableId); + if (tableState != null && tableState != TableState.DELETING) { + // clone directories don't always exist + if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) + log.debug("File doesn't exist: " + fullPath); + } + } else { + log.warn("Very strange path name: " + delete); + } + } + + // proceed to clearing out the flags for successful deletes and + // non-existent files + if (removeFlag && finalWriter != null) { + putMarkerDeleteMutation(delete, finalWriter); + } + } catch (Exception e) { + log.error(e, e); + } + + } + + }; + + deleteThreadPool.execute(deleteTask); + } + + deleteThreadPool.shutdown(); + + try { + while (!deleteThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS)) {} + } catch (InterruptedException e1) { + log.error(e1, e1); + } + + if (writer != null) { + try { + writer.close(); + } catch (MutationsRejectedException e) { + log.error("Problem removing entries from the metadata table: ", e); + } + } + } + + @Override + public void deleteTableDirIfEmpty(String tableID) throws IOException { + // if dir exist and is empty, then empty list is returned... + // hadoop 1.0 will return null if the file doesn't exist + // hadoop 2.0 will throw an exception if the file does not exist + for (String dir : ServerConstants.getTablesDirs()) { + FileStatus[] tabletDirs = null; + try { + tabletDirs = fs.listStatus(new Path(dir + "/" + tableID)); + } catch (FileNotFoundException ex) { + // ignored + } + if (tabletDirs == null) + continue; + + if (tabletDirs.length == 0) { + Path p = new Path(dir + "/" + tableID); + log.debug("Removing table dir " + p); + if (!archiveOrMoveToTrash(p)) + fs.delete(p); + } + } + } + + @Override + public void incrementCandidatesStat(long i) { + status.current.candidates += i; + } + + @Override + public void incrementInUseStat(long i) { + status.current.inUse += i; + } + + } + + private void run() { + long tStart, tStop; + + // Sleep for an initial period, giving the master time to start up and + // old data files to be unused + log.info("Trying to acquire ZooKeeper lock for garbage collector"); + + try { + getZooLock(startStatsService()); + } catch (Exception ex) { + log.error(ex, ex); + System.exit(1); + } + + try { + long delay = getStartDelay(); + log.debug("Sleeping for " + delay + " milliseconds before beginning garbage collection cycles"); + Thread.sleep(delay); + } catch (InterruptedException e) { + log.warn(e, e); + return; + } + + Sampler sampler = new CountSampler(100); + + while (true) { + if (sampler.next()) + Trace.on("gc"); + + Span gcSpan = Trace.start("loop"); + tStart = System.currentTimeMillis(); + try { + System.gc(); // make room + + status.current.started = System.currentTimeMillis(); + + new GarbageCollectionAlgorithm().collect(new GCEnv(RootTable.NAME)); + new GarbageCollectionAlgorithm().collect(new GCEnv(MetadataTable.NAME)); + + log.info("Number of data file candidates for deletion: " + status.current.candidates); + log.info("Number of data file candidates still in use: " + status.current.inUse); + log.info("Number of successfully deleted data files: " + status.current.deleted); + log.info("Number of data files delete failures: " + status.current.errors); + + status.current.finished = System.currentTimeMillis(); + status.last = status.current; + status.current = new GcCycleStats(); + + } catch (Exception e) { + log.error(e, e); + } + + tStop = System.currentTimeMillis(); + log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0))); + + // Clean up any unused write-ahead logs + Span waLogs = Trace.start("walogs"); + try { + GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, isUsingTrash()); + log.info("Beginning garbage collection of write-ahead logs"); + walogCollector.collect(status); + } catch (Exception e) { + log.error(e, e); + } finally { + waLogs.stop(); + } + gcSpan.stop(); + + // we just made a lot of metadata changes: flush them out + try { + Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken()); + connector.tableOperations().compact(MetadataTable.NAME, null, null, true, true); + connector.tableOperations().compact(RootTable.NAME, null, null, true, true); + } catch (Exception e) { + log.warn(e, e); + } + + Trace.offNoFlush(); + try { + long gcDelay = config.getTimeInMillis(Property.GC_CYCLE_DELAY); + log.debug("Sleeping for " + gcDelay + " milliseconds"); + Thread.sleep(gcDelay); + } catch (InterruptedException e) { + log.warn(e, e); + return; + } + } + } + + /** + * Moves a file to trash. If this garbage collector is not using trash, this method returns false and leaves the file alone. If the file is missing, this + * method returns false as opposed to throwing an exception. + * + * @return true if the file was moved to trash + * @throws IOException + * if the volume manager encountered a problem + */ + boolean archiveOrMoveToTrash(Path path) throws IOException { + if (shouldArchiveFiles()) { + return archiveFile(path); + } else { + if (!isUsingTrash()) + return false; + try { + return fs.moveToTrash(path); + } catch (FileNotFoundException ex) { + return false; + } + } + } + + /** + * Move a file, that would otherwise be deleted, to the archive directory for files + * + * @param fileToArchive + * Path to file that is to be archived + * @return True if the file was successfully moved to the file archive directory, false otherwise + */ + boolean archiveFile(Path fileToArchive) throws IOException { + // Figure out what the base path this volume uses on this FileSystem + Volume sourceVolume = fs.getVolumeByPath(fileToArchive); + String sourceVolumeBasePath = sourceVolume.getBasePath(); + + log.debug("Base path for volume: " + sourceVolumeBasePath); + + // Get the path for the file we want to archive + String sourcePathBasePath = fileToArchive.toUri().getPath(); + + // Strip off the common base path for the file to archive + String relativeVolumePath = sourcePathBasePath.substring(sourceVolumeBasePath.length()); + if (Path.SEPARATOR_CHAR == relativeVolumePath.charAt(0)) { + if (relativeVolumePath.length() > 1) { + relativeVolumePath = relativeVolumePath.substring(1); + } else { + relativeVolumePath = ""; + } + } + + log.debug("Computed relative path for file to archive: " + relativeVolumePath); + + // The file archive path on this volume (we can't archive this file to a different volume) + Path archivePath = new Path(sourceVolumeBasePath, ServerConstants.FILE_ARCHIVE_DIR); + + log.debug("File archive path: " + archivePath); + + fs.mkdirs(archivePath); + + // Preserve the path beneath the Volume's base directory (e.g. tables/1/A_0000001.rf) + Path fileArchivePath = new Path(archivePath, relativeVolumePath); + + log.debug("Create full path of " + fileArchivePath + " from " + archivePath + " and " + relativeVolumePath); + + // Make sure that it doesn't already exist, something is wrong. + if (fs.exists(fileArchivePath)) { + log.warn("Tried to archive file, but it already exists: " + fileArchivePath); + return false; + } + + log.debug("Moving " + fileToArchive + " to " + fileArchivePath); + return fs.rename(fileToArchive, fileArchivePath); + } + + private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException { + String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK; + + LockWatcher lockWatcher = new LockWatcher() { + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!"); + } + + @Override + public void unableToMonitorLockNode(final Throwable e) { + Halt.halt(-1, new Runnable() { + + @Override + public void run() { + log.fatal("No longer able to monitor lock node ", e); + } + }); + + } + }; + + while (true) { + lock = new ZooLock(path); + if (lock.tryLock(lockWatcher, new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) { + break; + } + UtilWaitThread.sleep(1000); + } + } + + private HostAndPort startStatsService() throws UnknownHostException { + Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this)); + int port = config.getPort(Property.GC_PORT); + long maxMessageSize = config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); + HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); + log.debug("Starting garbage collector listening on " + result); + try { + return TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize, + SslConnectionParams.forServer(config), 0).address; + } catch (Exception ex) { + log.fatal(ex, ex); + throw new RuntimeException(ex); + } + } + + /** + * Checks if the system is almost out of memory. + * + * @param runtime + * Java runtime + * @return true if system is almost out of memory + * @see #CANDIDATE_MEMORY_PERCENTAGE + */ + static boolean almostOutOfMemory(Runtime runtime) { + return runtime.totalMemory() - runtime.freeMemory() > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory(); + } + + final static String METADATA_TABLE_DIR = "/" + MetadataTable.ID; + + private static void putMarkerDeleteMutation(final String delete, final BatchWriter writer) throws MutationsRejectedException { + Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + delete); + m.putDelete(EMPTY_TEXT, EMPTY_TEXT); + writer.addMutation(m); + } + + /** + * Checks if the given string is a directory. + * + * @param delete + * possible directory + * @return true if string is a directory + */ + static boolean isDir(String delete) { + if (delete == null) { + return false; + } + int slashCount = 0; + for (int i = 0; i < delete.length(); i++) + if (delete.charAt(i) == '/') + slashCount++; + return slashCount == 1; + } + + @Override + public GCStatus getStatus(TInfo info, TCredentials credentials) { + return status; + } +}