ACCUMULO-2654 Adds utility for creating empty rfile.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5b32fd22 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5b32fd22 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5b32fd22 Branch: refs/heads/master Commit: 5b32fd22b928caa44ea1535851b57cede5f34c5f Parents: 3648c1b Author: Sean Busbey <bus...@cloudera.com> Authored: Wed Apr 9 11:34:56 2014 -0500 Committer: Sean Busbey <bus...@cloudera.com> Committed: Tue Apr 22 12:25:44 2014 -0500 ---------------------------------------------------------------------- .../accumulo/core/file/rfile/CreateEmpty.java | 72 +++++++++++++ .../core/file/rfile/RFileOperations.java | 6 +- test/system/auto/simple/recoverWithEmpty.py | 104 +++++++++++++++++++ 3 files changed, 180 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java new file mode 100644 index 0000000..7663b2d --- /dev/null +++ b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile.Writer; +import org.apache.accumulo.core.file.rfile.bcfile.TFile; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path. + */ +public class CreateEmpty { + + public static void main(String[] args) throws Exception { + Configuration conf = CachedConfiguration.getInstance(); + + Options opts = new Options(); + Option codecOption = new Option("c", "codec", true, "the compression codec to use. one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()) + ". defaults to none."); + opts.addOption(codecOption); + Option help = new Option( "?", "help", false, "print this message" ); + opts.addOption(help); + + CommandLine commandLine = new BasicParser().parse(opts, args); + if (commandLine.hasOption(help.getOpt()) || 0 == commandLine.getArgs().length) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(120, "$ACCUMULO_HOME/bin/accumulo " + CreateEmpty.class.getName() + "[options] path [path ...]", + "", opts, + "each path given is a filesystem URL. Relative paths are resolved according to the default filesytem defined in your Hadoop configuration, which is usually an HDFS instance."); + } + String codec = commandLine.getOptionValue(codecOption.getOpt(), TFile.COMPRESSION_NONE); + + for (String arg : commandLine.getArgs()) { + if (!arg.endsWith(".rf")) { + throw new IllegalArgumentException("File must end with .rf and '" + arg + "' does not."); + } + } + + for (String arg : commandLine.getArgs()) { + Path path = new Path(arg); + FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), codec); + writer.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java ---------------------------------------------------------------------- diff --git a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 5374332..c906522 100644 --- a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -103,6 +103,10 @@ public class RFileOperations extends FileOperations { @Override public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); + } + + FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException { int hrep = conf.getInt("dfs.replication", -1); int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION); int rep = hrep; @@ -119,8 +123,6 @@ public class RFileOperations extends FileOperations { long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE); long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX); - String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE); - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf); Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize); return writer; http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/test/system/auto/simple/recoverWithEmpty.py ---------------------------------------------------------------------- diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py new file mode 100755 index 0000000..18ac055 --- /dev/null +++ b/test/system/auto/simple/recoverWithEmpty.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import logging +import unittest +import time +import sys + +from TestUtils import TestUtilsMixin, ROOT, ROOT_PASSWORD, SITE_PATH + +log = logging.getLogger('test.auto') + +# XXX As a part of verifying lossy recovery via inserting an empty rfile, +# this test deletes test table tablets within HDFS. +# It will require write access to the backing files of the test Accumulo +# instance in HDFS. +# +# This test should read instance.dfs.dir properly from the test harness, but +# if you want to take a paranoid approach just make sure the test user +# doesn't have write access to the HDFS files of any colocated live +# Accumulo instance. +class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin): + "Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan." + order = 95 + + def add_options(self, parser): + if not parser.has_option('-c'): + parser.add_option('-c', '--rows', dest='rows', + default=20000, type=int, + help="The number of rows to write " + "when testing (%default)") + if not parser.has_option('-n'): + parser.add_option('-n', '--size', dest='size', + default=50, type=int, + help="The size of values to write " + "when testing (%default)") + def setUp(self): + TestUtilsMixin.setUp(self); + # initialize the database + self.createTable('test_ingest') + # start test ingestion + log.info("Starting Test Ingester") + self.ingester = self.ingest(self.masterHost(), + self.options.rows, + size=self.options.size) + + def tearDown(self): + TestUtilsMixin.tearDown(self) + self.pkill(self.masterHost(), 'TestIngest') + + def waitTime(self): + return 1000*120 * self.options.rows / 1e6 + 30 + + def runTest(self): + waitTime = self.waitTime() + + self.waitForStop(self.ingester, waitTime) + + log.info("Verifying Ingestion") + self.waitForStop(self.verify(self.masterHost(), + self.options.rows, + size=self.options.size), + waitTime) + log.info("Replacing rfile with empty") + out,err,code = self.shell(self.masterHost(), 'flush -t test_ingest\n') + self.processResult(out,err,code) + out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t !METADATA']), waitTime) + self.failUnless(out.find("%s< file:/default_tablet/F0000000.rf" % self.getTableId('test_ingest')) >= 0, + "Test assumptions about the rfiles backing our test table are wrong. please file a bug.") + out,err,code = self.shell(self.masterHost(), 'offline -t test_ingest\n') + self.processResult(out,err,code) + import config + rfile = "%s/tables/%s/default_tablet/F0000000.rf" % (config.parse(SITE_PATH)['instance.dfs.dir'], self.getTableId('test_ingest')) + log.info("Removing rfile '%s'" % rfile) + self.waitForStop(self.runOn(self.masterHost(), ['hadoop', 'fs', '-rm', rfile]), waitTime) + self.waitForStop(self.runClassOn(self.masterHost(), + "org.apache.accumulo.core.file.rfile.CreateEmpty", + [rfile]), + waitTime) + log.info("Make sure we can still scan") + out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n') + self.processResult(out,err,code); + out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime) + self.failUnless(len(out) == 0) + self.shutdown_accumulo() + +def suite(): + result = unittest.TestSuite() + result.addTest(RecoverWithEmptyTest()) + return result