Updated Branches: refs/heads/master 8f6f67018 -> b64149d78
ACCUMULO-1830 fix WAL to use consistent METADATA entries Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b64149d7 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b64149d7 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b64149d7 Branch: refs/heads/master Commit: b64149d782ef521347fb16bfeecc5c7522a6ee7d Parents: 8f6f670 Author: Eric Newton <eric.new...@gmail.com> Authored: Fri Nov 1 15:10:02 2013 -0400 Committer: Eric Newton <eric.new...@gmail.com> Committed: Fri Nov 1 15:10:18 2013 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/tserver/Tablet.java | 4 +- .../apache/accumulo/tserver/log/DfsLogger.java | 27 +++-- .../org/apache/accumulo/test/CleanWalIT.java | 108 +++++++++++++++++++ 3 files changed, 122 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/b64149d7/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java index 0ef6c6b..818f978 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java @@ -1405,9 +1405,7 @@ public class Tablet { currentLogs = new HashSet<DfsLogger>(); for (LogEntry logEntry : logEntries) { for (String log : logEntry.logSet) { - String[] parts = log.split("/", 2); - Path file = fs.getFullPath(FileType.WAL, parts[1]); - currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, file)); + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b64149d7/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 92c828e..612678a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -229,16 +229,14 @@ public class DfsLogger { private FSDataOutputStream logFile; private DataOutputStream encryptingLogFile = null; private Method sync; - private Path logPath; - private String logger; + private String logPath; public DfsLogger(ServerResources conf) throws IOException { this.conf = conf; } - public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException { + public DfsLogger(ServerResources conf, String filename) throws IOException { this.conf = conf; - this.logger = logger; this.logPath = filename; } @@ -323,23 +321,23 @@ public class DfsLogger { public synchronized void open(String address) throws IOException { String filename = UUID.randomUUID().toString(); - logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); - + String logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); + log.debug("DfsLogger.open() begin"); VolumeManager fs = conf.getFileSystem(); - logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename); + logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename; try { short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); if (replication == 0) - replication = fs.getDefaultReplication(logPath); + replication = fs.getDefaultReplication(new Path(logPath)); long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE); if (blockSize == 0) blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1); if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC)) - logFile = fs.createSyncable(logPath, 0, replication, blockSize); + logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize); else - logFile = fs.create(logPath, true, 0, replication, blockSize); + logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); try { NoSuchMethodException e = null; @@ -412,10 +410,6 @@ public class DfsLogger { return getLogger() + "/" + getFileName(); } - public String getLogger() { - return logger; - } - public String getFileName() { return logPath.toString(); } @@ -539,5 +533,10 @@ public class DfsLogger { throw ex; } } + + public String getLogger() { + String parts[] = logPath.split("/"); + return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":"); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b64149d7/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java new file mode 100644 index 0000000..3c07547 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.accumulo.minicluster.ProcessReference; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.junit.Test; + +public class CleanWalIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfig cfg) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); + cfg.setNumTservers(1); + cfg.useMiniDFS(true); + } + + // test for ACCUMULO-1830 + @Test(timeout= 4 * 60 * 1000) + public void test() throws Exception { + Connector conn = getConnector(); + String tableName = getTableNames(1)[0]; + conn.tableOperations().create(tableName); + BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row"); + m.put("cf", "cq", "value"); + bw.addMutation(m); + bw.close(); + for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) + getCluster().killProcess(ServerType.TABLET_SERVER, tserver); + // all 3 tables should do recovery, but the bug doesn't really remove the log file references + getCluster().start(); + for (String table : new String[]{MetadataTable.NAME, RootTable.NAME}) + conn.tableOperations().flush(table, null, null, true); + assertEquals(1, count(tableName, conn)); + for (String table : new String[]{MetadataTable.NAME, RootTable.NAME}) + assertEquals(0, countLogs(table, conn)); + + bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + m = new Mutation("row"); + m.putDelete("cf", "cq"); + bw.addMutation(m); + bw.close(); + assertEquals(0, count(tableName, conn)); + conn.tableOperations().flush(tableName, null, null, true); + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + conn.tableOperations().flush(RootTable.NAME, null, null, true); + for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) + getCluster().killProcess(ServerType.TABLET_SERVER, tserver); + UtilWaitThread.sleep(3 * 1000); + getCluster().start(); + assertEquals(0, count(tableName, conn)); + } + + private int countLogs(String tableName, Connector conn) throws TableNotFoundException { + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + int count = 0; + for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) { + count++; + } + return count; + } + + int count(String tableName, Connector conn) throws Exception { + Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + int result = 0; + for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) { + result++; + } + return result; + } + +}