http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
----------------------------------------------------------------------
diff --cc
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
index fc3b938,0000000..21138e6
mode 100644,000000..100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LocalWALRecovery.java
@@@ -1,178 -1,0 +1,178 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
- import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
++import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class will attempt to rewrite any local WALs to HDFS.
+ */
+public class LocalWALRecovery implements Runnable {
+ private static final Logger log = Logger.getLogger(LocalWALRecovery.class);
+
+ public static void main(String[] args) throws IOException {
+ AccumuloConfiguration configuration =
SiteConfiguration.getInstance(SiteConfiguration.getDefaultConfiguration());
+
+ LocalWALRecovery main = new LocalWALRecovery(configuration);
+ main.parseArgs(args);
+ main.run();
+ }
+
+ public final class Options {
+ @Parameter(names = "--delete-local", description = "Specify whether to
delete the local WAL files after they have been re-written in HDFS.")
+ public boolean deleteLocal = false;
+
+ @Parameter(names = "--local-wal-directories",
+ description = "Comma separated list of local directories containing
WALs, default is set according to the logger.dir.walog property.")
+ public List<String> directories = getDefaultDirectories();
+
+ @Parameter(names = "--dfs-wal-directory",
+ description = "The directory that WALs will be copied into. Will
default to the first configured base dir + '/wal'")
+ public String destination = null;
+
+ private List<String> getDefaultDirectories() {
+ String property = configuration.get(Property.LOGGER_DIR);
+ return Arrays.asList(property.split(","));
+ }
+ }
+
+ private final AccumuloConfiguration configuration;
+ private final Options options;
+
+ /**
+ * Create a WAL recovery tool for the given instance.
+ */
+ public LocalWALRecovery(AccumuloConfiguration configuration) {
+ this.configuration = configuration;
+ this.options = new Options();
+ }
+
+ @VisibleForTesting
+ public void parseArgs(String... args) {
+ JCommander jcommander = new JCommander();
+ jcommander.addObject(options);
+
+ try {
+ jcommander.parse(args);
+ } catch (ParameterException e) {
+ jcommander.usage();
+ }
+ }
+
+ @Override
+ public void run() {
+ SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
+
+ try {
+
recoverLocalWriteAheadLogs(VolumeManagerImpl.get().getDefaultVolume().getFileSystem());
+ } catch (IOException e) {
+ log.error("Error while recovering WAL files.", e);
+ }
+ }
+
+ public void recoverLocalWriteAheadLogs(FileSystem fs) throws IOException {
+ for (String directory : options.directories) {
+ File localDirectory = new File(directory);
+ if (!localDirectory.isAbsolute()) {
+ localDirectory = new File(System.getenv("ACCUMULO_HOME"), directory);
+ }
+
+ if (!localDirectory.isDirectory()) {
+ log.warn("Local walog dir " + localDirectory.getAbsolutePath() + "
does not exist or is not a directory.");
+ continue;
+ }
+
+ if (options.destination == null) {
+ // Defer loading the default value until now because it might require
talking to zookeeper.
+ options.destination = ServerConstants.getWalDirs()[0];
+ }
+ log.info("Copying WALs to " + options.destination);
+
+ for (File file : localDirectory.listFiles()) {
+ String name = file.getName();
+ try {
+ UUID.fromString(name);
+ } catch (IllegalArgumentException ex) {
+ log.info("Ignoring non-log file " + file.getAbsolutePath());
+ continue;
+ }
+
+ @SuppressWarnings("deprecation")
+ org.apache.accumulo.server.logger.LogFileKey key = new
org.apache.accumulo.server.logger.LogFileKey();
+ @SuppressWarnings("deprecation")
+ org.apache.accumulo.server.logger.LogFileValue value = new
org.apache.accumulo.server.logger.LogFileValue();
+
+ log.info("Openning local log " + file.getAbsolutePath());
+
+ Path localWal = new Path(file.toURI());
+ FileSystem localFs = FileSystem.getLocal(fs.getConf());
+
+ @SuppressWarnings("deprecation")
+ Reader reader = new SequenceFile.Reader(localFs, localWal,
localFs.getConf());
+ // Reader reader = new SequenceFile.Reader(localFs.getConf(),
SequenceFile.Reader.file(localWal));
+ Path tmp = new Path(options.destination + "/" + name + ".copy");
+ FSDataOutputStream writer = fs.create(tmp);
+ while (reader.next(key, value)) {
+ try {
+ key.write(writer);
+ value.write(writer);
+ } catch (EOFException ex) {
+ break;
+ }
+ }
+ writer.close();
+ reader.close();
+ fs.rename(tmp, new Path(tmp.getParent(), name));
+
+ if (options.deleteLocal) {
+ if (file.delete()) {
+ log.info("Copied and deleted: " + name);
+ } else {
+ log.info("Failed to delete: " + name + " (but it is safe for you
to delete it manually).");
+ }
+ } else {
+ log.info("Safe to delete: " + name);
+ }
+ }
+ }
+ }
+
+}