Updated Branches: refs/heads/master 260b6bf9f -> 322ee056c
ACCUMULO-2072 fixed generation of wal recovery paths Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0b95612f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0b95612f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0b95612f Branch: refs/heads/master Commit: 0b95612faf0ea6f340a1da3f8f04dc6b6e2bb2b0 Parents: 0d34428 Author: Keith Turner <ktur...@apache.org> Authored: Thu Dec 19 22:57:08 2013 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Thu Dec 19 23:00:02 2013 -0500 ---------------------------------------------------------------------- .../server/master/recovery/RecoveryPath.java | 56 ++++++++++++++++++++ .../master/recovery/RecoveryManager.java | 14 +++-- .../apache/accumulo/tserver/TabletServer.java | 5 +- 3 files changed, 65 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b95612f/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java new file mode 100644 index 0000000..1da945d --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.recovery; + +import java.io.IOException; + +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.hadoop.fs.Path; + +/** + * + */ +public class RecoveryPath { + + // given a wal path, transform it to a recovery path + public static Path getRecoveryPath(VolumeManager fs, Path walPath) throws IOException { + if (walPath.depth() >= 3 && walPath.toUri().getScheme() != null) { + // its a fully qualified path + String uuid = walPath.getName(); + // drop uuid + walPath = walPath.getParent(); + // drop server + walPath = walPath.getParent(); + + if (!walPath.getName().equals(FileType.WAL.getDirectory())) + throw new IllegalArgumentException("Bad path " + walPath); + + // drop wal + walPath = walPath.getParent(); + + walPath = new Path(walPath, FileType.RECOVERY.getDirectory()); + walPath = new Path(walPath, uuid); + + return walPath; + } + + throw new IllegalArgumentException("Bad path " + walPath); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b95612f/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java index dbe73ac..789d482 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java +++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java @@ -35,10 +35,10 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.master.recovery.HadoopLogCloser; import org.apache.accumulo.server.master.recovery.LogCloser; +import org.apache.accumulo.server.master.recovery.RecoveryPath; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.fs.Path; @@ -126,13 +126,11 @@ public class RecoveryManager { ; for (Collection<String> logs : walogs) { for (String walog : logs) { - String hostFilename[] = walog.split("/", 2); - String host = hostFilename[0]; - String filename = hostFilename[1]; - String parts[] = filename.split("/"); + + String parts[] = walog.split("/"); String sortId = parts[parts.length - 1]; - String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId; - filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString(); + String filename = master.getFileSystem().getFullPath(FileType.WAL, walog).toString(); + String dest = RecoveryPath.getRecoveryPath(master.getFileSystem(), new Path(filename)).toString(); log.debug("Recovering " + filename + " to " + dest); boolean sortQueued; @@ -167,7 +165,7 @@ public class RecoveryManager { delay = Math.min(2 * delay, 1000 * 60 * 5l); } - log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference"); + log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s), tablet " + extent + " holds a reference"); executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); closeTasksQueued.add(sortId); http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b95612f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 9dabee7..e3f508f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -157,6 +157,7 @@ import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.master.recovery.RecoveryPath; import org.apache.accumulo.server.master.state.Assignment; import org.apache.accumulo.server.master.state.DistributedStoreException; import org.apache.accumulo.server.master.state.TServerInstance; @@ -3661,8 +3662,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu for (LogEntry entry : sorted) { Path recovery = null; for (String log : entry.logSet) { - String[] parts = log.split("/"); // "host:port/filename" - Path finished = new Path(fs.getFullPath(FileType.RECOVERY, parts[parts.length - 1]), "finished"); + Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log)); + finished = new Path(finished, "finished"); TabletServer.log.info("Looking for " + finished); if (fs.exists(finished)) { recovery = finished.getParent();