Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/101cd1fa Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/101cd1fa Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/101cd1fa Branch: refs/heads/1.6.0-SNAPSHOT Commit: 101cd1faad447c17191b203fb55096b8bd083ad0 Parents: 0aa1d03 5b32fd2 Author: Sean Busbey <bus...@cloudera.com> Authored: Tue Apr 15 15:55:59 2014 -0500 Committer: Sean Busbey <bus...@cloudera.com> Committed: Tue Apr 22 14:36:24 2014 -0500 ---------------------------------------------------------------------- .../accumulo/core/file/rfile/CreateEmpty.java | 72 +++++++++++++ .../core/file/rfile/RFileOperations.java | 6 +- test/system/auto/simple/recoverWithEmpty.py | 104 +++++++++++++++++++ 3 files changed, 180 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/101cd1fa/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java index 0000000,0000000..7663b2d new file mode 100644 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java @@@ -1,0 -1,0 +1,72 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.core.file.rfile; ++ ++import java.util.ArrayList; ++import java.util.Arrays; ++ ++import org.apache.accumulo.core.conf.DefaultConfiguration; ++import org.apache.accumulo.core.file.FileSKVWriter; ++import org.apache.accumulo.core.file.rfile.RFile.Writer; ++import org.apache.accumulo.core.file.rfile.bcfile.TFile; ++import org.apache.accumulo.core.util.CachedConfiguration; ++import org.apache.commons.cli.BasicParser; ++import org.apache.commons.cli.CommandLine; ++import org.apache.commons.cli.HelpFormatter; ++import org.apache.commons.cli.Option; ++import org.apache.commons.cli.Options; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++ ++/** ++ * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path. ++ */ ++public class CreateEmpty { ++ ++ public static void main(String[] args) throws Exception { ++ Configuration conf = CachedConfiguration.getInstance(); ++ ++ Options opts = new Options(); ++ Option codecOption = new Option("c", "codec", true, "the compression codec to use. one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()) + ". defaults to none."); ++ opts.addOption(codecOption); ++ Option help = new Option( "?", "help", false, "print this message" ); ++ opts.addOption(help); ++ ++ CommandLine commandLine = new BasicParser().parse(opts, args); ++ if (commandLine.hasOption(help.getOpt()) || 0 == commandLine.getArgs().length) { ++ HelpFormatter formatter = new HelpFormatter(); ++ formatter.printHelp(120, "$ACCUMULO_HOME/bin/accumulo " + CreateEmpty.class.getName() + "[options] path [path ...]", ++ "", opts, ++ "each path given is a filesystem URL. Relative paths are resolved according to the default filesytem defined in your Hadoop configuration, which is usually an HDFS instance."); ++ } ++ String codec = commandLine.getOptionValue(codecOption.getOpt(), TFile.COMPRESSION_NONE); ++ ++ for (String arg : commandLine.getArgs()) { ++ if (!arg.endsWith(".rf")) { ++ throw new IllegalArgumentException("File must end with .rf and '" + arg + "' does not."); ++ } ++ } ++ ++ for (String arg : commandLine.getArgs()) { ++ Path path = new Path(arg); ++ FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), codec); ++ writer.close(); ++ } ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/101cd1fa/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 5374332,0000000..c906522 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@@ -1,128 -1,0 +1,130 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.file.rfile.RFile.Writer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class RFileOperations extends FileOperations { + + private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet(); + + @Override + public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + return fs.getFileStatus(new Path(file)).getLen(); + } + + @Override + public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + + return openIndex(file, fs, conf, acuconf, null, null); + } + + @Override + public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) + throws IOException { + Path path = new Path(file); + // long len = fs.getFileStatus(path).getLen(); + // FSDataInputStream in = fs.open(path); + // Reader reader = new RFile.Reader(in, len , conf); + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache); + final Reader reader = new RFile.Reader(_cbr); + + return reader.getIndex(); + } + + @Override + public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + return openReader(file, seekToBeginning, fs, conf, acuconf, null, null); + } + + @Override + public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, + BlockCache dataCache, BlockCache indexCache) throws IOException { + Path path = new Path(file); + + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache); + Reader iter = new RFile.Reader(_cbr); + + if (seekToBeginning) { + iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false); + } + + return iter; + } + + @Override + public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, + AccumuloConfiguration tableConf) throws IOException { + FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null); + iter.seek(range, columnFamilies, inclusive); + return iter; + } + + @Override + public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, + AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException { + FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache); + iter.seek(range, columnFamilies, inclusive); + return iter; + } + + @Override + public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { ++ return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)); ++ } ++ ++ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException { + int hrep = conf.getInt("dfs.replication", -1); + int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION); + int rep = hrep; + if (trep > 0 && trep != hrep) { + rep = trep; + } + long hblock = conf.getLong("dfs.block.size", 1 << 26); + long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE); + long block = hblock; + if (tblock > 0) + block = tblock; + int bufferSize = conf.getInt("io.file.buffer.size", 4096); + + long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE); + long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX); + - String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE); - + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf); + Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize); + return writer; + } +}