Updated Branches: refs/heads/master 2505f7a8c -> 8730d5cef
ACCUMULO-1814 Fixes WALog reading issue from 1.5 WALogs from 1.5.0 use a slightly different format than WALogs from 1.6.0. Consequently, the code to read WALogs from 1.6.0 doesn't work on files from 1.5.0. I made changes to DFSLogger to correctly interpret 1.5.0 based log files, and increments the log version number to 3, so that the WALog reading code can correctly identify what type of file we're dealing with. This may still require more testing especially with encryption turned on but for now this will do. Signed-off-by: Eric Newton <eric.new...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8730d5ce Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8730d5ce Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8730d5ce Branch: refs/heads/master Commit: 8730d5cef37f301fc3f204a48e6af63c32a4f207 Parents: 2505f7a Author: Michael Allen <mich...@sqrrl.com> Authored: Mon Oct 28 00:12:11 2013 -0400 Committer: Eric Newton <eric.new...@gmail.com> Committed: Mon Oct 28 09:44:01 2013 -0400 ---------------------------------------------------------------------- server/pom.xml | 10 ++ .../accumulo/server/logger/LogReader.java | 18 ++- .../server/tabletserver/log/DfsLogger.java | 123 ++++++++++++--- .../server/tabletserver/log/LogSorter.java | 47 +----- .../log/TestUpgradePathForWALogs.java | 154 +++++++++++++++++++ server/src/test/resources/walog-from-15.walog | Bin 0 -> 25608 bytes server/src/test/resources/walog-from-16.walog | Bin 0 -> 8170 bytes 7 files changed, 282 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index ff846b4..f7da3bb 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -128,6 +128,16 @@ <testResource> <filtering>true</filtering> <directory>src/test/resources</directory> + <excludes> + <exclude>**/*.walog</exclude> + </excludes> + </testResource> + <testResource> + <filtering>false</filtering> + <directory>src/test/resources</directory> + <includes> + <include>**/*.walog</include> + </includes> </testResource> </testResources> <pluginManagement> http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java index 719b85c..9368819 100644 --- a/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java +++ b/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java @@ -16,25 +16,25 @@ */ package org.apache.accumulo.server.logger; +import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.tabletserver.log.DfsLogger; +import org.apache.accumulo.server.tabletserver.log.DfsLogger.DFSLoggerInputStreams; import org.apache.accumulo.server.tabletserver.log.MultiReader; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -90,26 +90,28 @@ public class LogReader { for (String file : opts.files) { - Map<String, String> meta = new HashMap<String, String>(); Path path = new Path(file); LogFileKey key = new LogFileKey(); LogFileValue value = new LogFileValue(); if (fs.isFile(path)) { // read log entries from a simple hdfs file - FSDataInputStream f = DfsLogger.readHeader(fs, path, meta); + @SuppressWarnings("deprecation") + DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getSiteConfiguration()); + DataInputStream input = streams.getDecryptingInputStream(); + try { while (true) { try { - key.readFields(f); - value.readFields(f); + key.readFields(input); + value.readFields(input); } catch (EOFException ex) { break; } printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); } } finally { - f.close(); + input.close(); } } else { // read the log entries sorted in a map file http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java index 0b64d42..de3d012 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java @@ -22,6 +22,7 @@ import static org.apache.accumulo.server.logger.LogEvents.DEFINE_TABLET; import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS; import static org.apache.accumulo.server.logger.LogEvents.OPEN; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -30,6 +31,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,6 +45,7 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.StringUtil; import org.apache.accumulo.server.ServerConstants; @@ -63,6 +66,7 @@ import org.apache.log4j.Logger; public class DfsLogger { // Package private so that LogSorter can find this static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; + static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; private static Logger log = Logger.getLogger(DfsLogger.class); @@ -74,6 +78,34 @@ public class DfsLogger { } } + public static class DFSLoggerInputStreams { + + private FSDataInputStream originalInput; + private DataInputStream decryptingInputStream; + + public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) { + this.originalInput = originalInput; + this.decryptingInputStream = decryptingInputStream; + } + + public FSDataInputStream getOriginalInput() { + return originalInput; + } + + public void setOriginalInput(FSDataInputStream originalInput) { + this.originalInput = originalInput; + } + + public DataInputStream getDecryptingInputStream() { + return decryptingInputStream; + } + + public void setDecryptingInputStream(DataInputStream decryptingInputStream) { + this.decryptingInputStream = decryptingInputStream; + } + } + + public interface ServerResources { AccumuloConfiguration getConfiguration(); @@ -210,28 +242,83 @@ public class DfsLogger { this.logPath = filename; } - public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map<String,String> opts) throws IOException { - FSDataInputStream file = fs.open(path); - try { - byte[] magic = LOG_FILE_HEADER_V2.getBytes(); - byte[] buffer = new byte[magic.length]; - file.readFully(buffer); - if (Arrays.equals(buffer, magic)) { - int count = file.readInt(); + public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException { + FSDataInputStream input = fs.open(path); + DataInputStream decryptingInput = null; + + byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(); + byte[] magicBuffer = new byte[magic.length]; + input.readFully(magicBuffer); + if (Arrays.equals(magicBuffer, magic)) { + // additional parameters it needs from the underlying stream. + String cryptoModuleClassname = input.readUTF(); + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory + .getCryptoModule(cryptoModuleClassname); + + // Create the parameters and set the input stream into those parameters + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + params.setEncryptedInputStream(input); + + // Create the plaintext input stream from the encrypted one + params = cryptoModule.getDecryptingInputStream(params); + + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } else { + input.seek(0); + byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes(); + byte[] magicBufferV2 = new byte[magic.length]; + input.readFully(magicBufferV2); + + if (Arrays.equals(magicBufferV2, magicV2)) { + // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class + // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be + // the NullCryptoModule (no crypto) or the DefaultCryptoModule. + + // If it's null, we won't have any parameters whatsoever. First, let's attempt to read + // parameters + Map<String,String> opts = new HashMap<String,String>(); + int count = input.readInt(); for (int i = 0; i < count; i++) { - String key = file.readUTF(); - String value = file.readUTF(); + String key = input.readUTF(); + String value = input.readUTF(); opts.put(key, value); } + + if (opts.size() == 0) { + // NullCryptoModule, we're done + decryptingInput = input; + } else { + + // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory + .getCryptoModule(DefaultCryptoModule.class.getName()); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + + input.seek(0); + input.readFully(magicBuffer); + params.setEncryptedInputStream(input); + + params = cryptoModule.getDecryptingInputStream(params); + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } + } else { - file.seek(0); - return file; + + input.seek(0); + decryptingInput = input; } - return file; - } catch (IOException ex) { - file.seek(0); - return file; + } + return new DFSLoggerInputStreams(input, decryptingInput); } public synchronized void open(String address) throws IOException { @@ -278,7 +365,7 @@ public class DfsLogger { .getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); // Initialize the log file with a header and the crypto params used to set up this log file. - logFile.write(LOG_FILE_HEADER_V2.getBytes()); + logFile.write(LOG_FILE_HEADER_V3.getBytes()); CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration()); @@ -290,8 +377,6 @@ public class DfsLogger { logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); - //@SuppressWarnings("deprecation") - //OutputStream encipheringOutputStream = cryptoModule.getEncryptingOutputStream(logFile, cryptoOpts); params = cryptoModule.getEncryptingOutputStream(params); OutputStream encipheringOutputStream = params.getEncryptedOutputStream(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java index abd9c6f..a630e5a 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java @@ -20,7 +20,6 @@ import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -34,14 +33,13 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.master.thrift.RecoveryStatus; -import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; -import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.logger.LogFileKey; import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.accumulo.server.tabletserver.log.DfsLogger.DFSLoggerInputStreams; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; import org.apache.hadoop.fs.FSDataInputStream; @@ -112,46 +110,9 @@ public class LogSorter { // the following call does not throw an exception if the file/dir does not exist fs.deleteRecursively(new Path(destPath)); - Map<String, String> opts = new HashMap<String,String>(); - FSDataInputStream tmpInput = DfsLogger.readHeader(fs, srcPath, opts); - - byte[] magic = DfsLogger.LOG_FILE_HEADER_V2.getBytes(); - byte[] magicBuffer = new byte[magic.length]; - tmpInput.readFully(magicBuffer); - if (!Arrays.equals(magicBuffer, magic)) { - tmpInput.seek(0); - synchronized (this) { - this.input = tmpInput; - this.decryptingInput = tmpInput; - } - } else { - // We read the crypto module class name here because we need to boot strap the class. The class itself will read any - // additional parameters it needs from the underlying stream. - String cryptoModuleClassname = tmpInput.readUTF(); - org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory - .getCryptoModule(cryptoModuleClassname); - - // Create the parameters and set the input stream into those parameters - CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); - params.setEncryptedInputStream(tmpInput); - - // Create the plaintext input stream from the encrypted one - params = cryptoModule.getDecryptingInputStream(params); - - // Store the plaintext input stream into member variables - synchronized (this) { - this.input = tmpInput; - - if (params.getPlaintextInputStream() instanceof DataInputStream) { - this.decryptingInput = (DataInputStream)params.getPlaintextInputStream(); - } else { - this.decryptingInput = new DataInputStream(params.getPlaintextInputStream()); - } - - } - - } - + DFSLoggerInputStreams inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf); + this.input = inputStreams.getOriginalInput(); + this.decryptingInput = inputStreams.getDecryptingInputStream(); final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); Thread.currentThread().setName("Sorting " + name + " for recovery"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java new file mode 100644 index 0000000..ef9439f --- /dev/null +++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/log/TestUpgradePathForWALogs.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.tabletserver.log; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestUpgradePathForWALogs { + + private static final String WALOG_FROM_15 = "/walog-from-15.walog"; + private static final String WALOG_FROM_16 = "/walog-from-16.walog"; + + VolumeManager fs; + + TemporaryFolder root; + + @Before + public void setUp() throws Exception { + File tempFile = File.createTempFile("TestUpgradePathForWALogs", ""); + String tempDirName = tempFile.getAbsolutePath() + "Dir"; + tempFile.delete(); + + File tempDir = new File(tempDirName); + tempDir.mkdirs(); + + root = new TemporaryFolder(new File(tempDirName)); + + // quiet log messages about compress.CodecPool + Logger.getRootLogger().setLevel(Level.ERROR); + fs = VolumeManagerImpl.getLocal(); + root.create(); + String path = root.getRoot().getAbsolutePath(); + Path manyMapsPath = new Path("file://" + path + "/manyMaps"); + fs.mkdirs(manyMapsPath); + fs.create(new Path(manyMapsPath, "finished")).close(); + // FileSystem ns = fs.getDefaultVolume(); + // Writer writer = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class); + // BytesWritable value = new BytesWritable("someValue".getBytes()); + // for (int i = 1; i < 1000; i += 2) { + // writer.append(new IntWritable(i), value); + // } + // writer.close(); + // writer = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class, BytesWritable.class); + // for (int i = 0; i < 1000; i += 2) { + // if (i == 10) + // continue; + // writer.append(new IntWritable(i), value); + // } + // writer.close(); + } + + @Test + public void testUpgradeOf15WALog() throws IOException { + InputStream walogStream = null; + OutputStream walogInHDFStream = null; + + try { + + walogStream = getClass().getResourceAsStream(WALOG_FROM_15); + walogInHDFStream = new FileOutputStream(new File(root.getRoot().getAbsolutePath() + WALOG_FROM_15)); + + IOUtils.copyLarge(walogStream, walogInHDFStream); + walogInHDFStream.flush(); + walogInHDFStream.close(); + walogInHDFStream = null; + + @SuppressWarnings("deprecation") + LogSorter logSorter = new LogSorter(null, fs, DefaultConfiguration.getSiteConfiguration()); + LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor(); + + logProcessor.sort(WALOG_FROM_15, new Path("file://" + root.getRoot().getAbsolutePath() + WALOG_FROM_15), "file://" + root.getRoot().getAbsolutePath() + + "/manyMaps"); + + } finally { + if (walogStream != null) { + walogStream.close(); + } + + if (walogInHDFStream != null) { + walogInHDFStream.close(); + } + } + } + + @Test + public void testBasic16WALogRead() throws IOException { + String walogToTest = WALOG_FROM_16; + + InputStream walogStream = null; + OutputStream walogInHDFStream = null; + + try { + + walogStream = getClass().getResourceAsStream(walogToTest); + walogInHDFStream = new FileOutputStream(new File(root.getRoot().getAbsolutePath() + walogToTest)); + + IOUtils.copyLarge(walogStream, walogInHDFStream); + walogInHDFStream.flush(); + walogInHDFStream.close(); + walogInHDFStream = null; + + @SuppressWarnings("deprecation") + LogSorter logSorter = new LogSorter(null, fs, DefaultConfiguration.getSiteConfiguration()); + LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor(); + + logProcessor.sort(walogToTest, new Path("file://" + root.getRoot().getAbsolutePath() + walogToTest), "file://" + root.getRoot().getAbsolutePath() + + "/manyMaps"); + + } finally { + if (walogStream != null) { + walogStream.close(); + } + + if (walogInHDFStream != null) { + walogInHDFStream.close(); + } + } + } + + @After + public void tearDown() throws Exception { + // root.delete(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/test/resources/walog-from-15.walog ---------------------------------------------------------------------- diff --git a/server/src/test/resources/walog-from-15.walog b/server/src/test/resources/walog-from-15.walog new file mode 100644 index 0000000..1922dea Binary files /dev/null and b/server/src/test/resources/walog-from-15.walog differ http://git-wip-us.apache.org/repos/asf/accumulo/blob/8730d5ce/server/src/test/resources/walog-from-16.walog ---------------------------------------------------------------------- diff --git a/server/src/test/resources/walog-from-16.walog b/server/src/test/resources/walog-from-16.walog new file mode 100644 index 0000000..4654f00 Binary files /dev/null and b/server/src/test/resources/walog-from-16.walog differ