Updated Branches:
  refs/heads/master 260b6bf9f -> 322ee056c

ACCUMULO-2072 fixed generation of wal recovery paths


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0b95612f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0b95612f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0b95612f

Branch: refs/heads/master
Commit: 0b95612faf0ea6f340a1da3f8f04dc6b6e2bb2b0
Parents: 0d34428
Author: Keith Turner <ktur...@apache.org>
Authored: Thu Dec 19 22:57:08 2013 -0500
Committer: Keith Turner <ktur...@apache.org>
Committed: Thu Dec 19 23:00:02 2013 -0500

----------------------------------------------------------------------
 .../server/master/recovery/RecoveryPath.java    | 56 ++++++++++++++++++++
 .../master/recovery/RecoveryManager.java        | 14 +++--
 .../apache/accumulo/tserver/TabletServer.java   |  5 +-
 3 files changed, 65 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b95612f/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
new file mode 100644
index 0000000..1da945d
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * 
+ */
+public class RecoveryPath {
+
+  // given a wal path, transform it to a recovery path
+  public static Path getRecoveryPath(VolumeManager fs, Path walPath) throws 
IOException {
+    if (walPath.depth() >= 3 && walPath.toUri().getScheme() != null) {
+      // its a fully qualified path
+      String uuid = walPath.getName();
+      // drop uuid
+      walPath = walPath.getParent();
+      // drop server
+      walPath = walPath.getParent();
+  
+      if (!walPath.getName().equals(FileType.WAL.getDirectory()))
+        throw new IllegalArgumentException("Bad path " + walPath);
+  
+      // drop wal
+      walPath = walPath.getParent();
+  
+      walPath = new Path(walPath, FileType.RECOVERY.getDirectory());
+      walPath = new Path(walPath, uuid);
+
+      return walPath;
+    }
+  
+    throw new IllegalArgumentException("Bad path " + walPath);
+  
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b95612f/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
 
b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
index dbe73ac..789d482 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java
@@ -35,10 +35,10 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.master.Master;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.master.recovery.HadoopLogCloser;
 import org.apache.accumulo.server.master.recovery.LogCloser;
+import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
@@ -126,13 +126,11 @@ public class RecoveryManager {
     ;
     for (Collection<String> logs : walogs) {
       for (String walog : logs) {
-        String hostFilename[] = walog.split("/", 2);
-        String host = hostFilename[0];
-        String filename = hostFilename[1];
-        String parts[] = filename.split("/");
+
+        String parts[] = walog.split("/");
         String sortId = parts[parts.length - 1];
-        String dest = 
master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
-        filename = master.getFileSystem().getFullPath(FileType.WAL, 
walog).toString();
+        String filename = master.getFileSystem().getFullPath(FileType.WAL, 
walog).toString();
+        String dest = RecoveryPath.getRecoveryPath(master.getFileSystem(), new 
Path(filename)).toString();
         log.debug("Recovering " + filename + " to " + dest);
 
         boolean sortQueued;
@@ -167,7 +165,7 @@ public class RecoveryManager {
               delay = Math.min(2 * delay, 1000 * 60 * 5l);
             }
 
-            log.info("Starting recovery of " + filename + " (in : " + (delay / 
1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
+            log.info("Starting recovery of " + filename + " (in : " + (delay / 
1000) + "s), tablet " + extent + " holds a reference");
 
             executor.schedule(new LogSortTask(closer, filename, dest, sortId), 
delay, TimeUnit.MILLISECONDS);
             closeTasksQueued.add(sortId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b95612f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 9dabee7..e3f508f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -157,6 +157,7 @@ import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -3661,8 +3662,8 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
     for (LogEntry entry : sorted) {
       Path recovery = null;
       for (String log : entry.logSet) {
-        String[] parts = log.split("/"); // "host:port/filename"
-        Path finished = new Path(fs.getFullPath(FileType.RECOVERY, 
parts[parts.length - 1]), "finished");
+        Path finished = RecoveryPath.getRecoveryPath(fs, 
fs.getFullPath(FileType.WAL, log));
+        finished = new Path(finished, "finished");
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
           recovery = finished.getParent();

Reply via email to