Updated Branches:
  refs/heads/master 8f6f67018 -> b64149d78

ACCUMULO-1830 fix WAL to use consistent METADATA entries


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b64149d7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b64149d7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b64149d7

Branch: refs/heads/master
Commit: b64149d782ef521347fb16bfeecc5c7522a6ee7d
Parents: 8f6f670
Author: Eric Newton <eric.new...@gmail.com>
Authored: Fri Nov 1 15:10:02 2013 -0400
Committer: Eric Newton <eric.new...@gmail.com>
Committed: Fri Nov 1 15:10:18 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/tserver/Tablet.java     |   4 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  |  27 +++--
 .../org/apache/accumulo/test/CleanWalIT.java    | 108 +++++++++++++++++++
 3 files changed, 122 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b64149d7/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 0ef6c6b..818f978 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1405,9 +1405,7 @@ public class Tablet {
       currentLogs = new HashSet<DfsLogger>();
       for (LogEntry logEntry : logEntries) {
         for (String log : logEntry.logSet) {
-          String[] parts = log.split("/", 2);
-          Path file = fs.getFullPath(FileType.WAL, parts[1]);
-          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), 
logEntry.server, file));
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b64149d7/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 92c828e..612678a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -229,16 +229,14 @@ public class DfsLogger {
   private FSDataOutputStream logFile;
   private DataOutputStream encryptingLogFile = null;
   private Method sync;
-  private Path logPath;
-  private String logger;
+  private String logPath;
   
   public DfsLogger(ServerResources conf) throws IOException {
     this.conf = conf;
   }
   
-  public DfsLogger(ServerResources conf, String logger, Path filename) throws 
IOException {
+  public DfsLogger(ServerResources conf, String filename) throws IOException {
     this.conf = conf;
-    this.logger = logger;
     this.logPath = filename;
   }
   
@@ -323,23 +321,23 @@ public class DfsLogger {
   
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
-    logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
-    
+    String logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
+
     log.debug("DfsLogger.open() begin");
     VolumeManager fs = conf.getFileSystem();
     
-    logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger 
+ "/" + filename);
+    logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + 
filename;
     try {
       short replication = (short) 
conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
-        replication = fs.getDefaultReplication(logPath);
+        replication = fs.getDefaultReplication(new Path(logPath));
       long blockSize = 
conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
       if (blockSize == 0)
         blockSize = (long) 
(conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
       if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
-        logFile = fs.createSyncable(logPath, 0, replication, blockSize);
+        logFile = fs.createSyncable(new Path(logPath), 0, replication, 
blockSize);
       else
-        logFile = fs.create(logPath, true, 0, replication, blockSize);
+        logFile = fs.create(new Path(logPath), true, 0, replication, 
blockSize);
       
       try {
         NoSuchMethodException e = null;
@@ -412,10 +410,6 @@ public class DfsLogger {
     return getLogger() + "/" + getFileName();
   }
   
-  public String getLogger() {
-    return logger;
-  }
-  
   public String getFileName() {
     return logPath.toString();
   }
@@ -539,5 +533,10 @@ public class DfsLogger {
       throw ex;
     }
   }
+
+  public String getLogger() {
+    String parts[] = logPath.split("/");
+    return StringUtil.join(Arrays.asList(parts[parts.length - 
2].split("[+]")), ":");
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b64149d7/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java 
b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
new file mode 100644
index 0000000..3c07547
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.accumulo.minicluster.ProcessReference;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.junit.Test;
+
+public class CleanWalIT extends ConfigurableMacIT {
+  
+  @Override
+  public void configure(MiniAccumuloConfig cfg) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
+    cfg.setNumTservers(1);
+    cfg.useMiniDFS(true);
+  }
+
+  // test for ACCUMULO-1830
+  @Test(timeout= 4 * 60 * 1000)
+  public void test() throws Exception {
+    Connector conn = getConnector();
+    String tableName = getTableNames(1)[0];
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new 
BatchWriterConfig());
+    Mutation m = new Mutation("row");
+    m.put("cf", "cq", "value");
+    bw.addMutation(m);
+    bw.close();
+    for (ProcessReference tserver : 
getCluster().getProcesses().get(ServerType.TABLET_SERVER))
+      getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
+    // all 3 tables should do recovery, but the bug doesn't really remove the 
log file references
+    getCluster().start();
+    for (String table : new String[]{MetadataTable.NAME, RootTable.NAME})
+      conn.tableOperations().flush(table, null, null, true);
+    assertEquals(1, count(tableName, conn));
+    for (String table : new String[]{MetadataTable.NAME, RootTable.NAME})
+      assertEquals(0, countLogs(table, conn));
+    
+    bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    m = new Mutation("row");
+    m.putDelete("cf", "cq");
+    bw.addMutation(m);
+    bw.close();
+    assertEquals(0, count(tableName, conn));
+    conn.tableOperations().flush(tableName, null, null, true);
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+    conn.tableOperations().flush(RootTable.NAME, null, null, true);
+    for (ProcessReference tserver : 
getCluster().getProcesses().get(ServerType.TABLET_SERVER))
+      getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
+    UtilWaitThread.sleep(3 * 1000);
+    getCluster().start();
+    assertEquals(0, count(tableName, conn));
+  }
+
+  private int countLogs(String tableName, Connector conn) throws 
TableNotFoundException {
+    Scanner scanner = conn.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY);
+    
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+    int count = 0;
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
+      count++;
+    }
+    return count;
+  }
+  
+  int count(String tableName, Connector conn) throws Exception {
+    Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    int result = 0;
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
+      result++;
+    }
+    return result;
+  }
+  
+}

Reply via email to