ACCUMULO-4165 Added a user facing API for RFile Squashed commit of the following:
commit c96cda97507fc611bfdf1699ea022769072adc55 Author: Keith Turner <ktur...@apache.org> Date: Tue May 31 17:53:20 2016 -0400 Multiple improvements based on code review. * Made AccumuloFileOutputFormat use new RFile public API. * Moved checking of visibility vallidity to within new RFile API impl * Added test for error conditions and documented expected exceptions. commit 005ebb2c0e47e4cf818c955f292c71696a4fff41 Author: Keith Turner <ktur...@apache.org> Date: Tue May 31 14:35:18 2016 -0400 updates based on code review comments commit a5c6ece070fc44923758a1da4aa50849f872fdf4 Author: Keith Turner <ktur...@apache.org> Date: Fri May 27 15:41:06 2016 -0400 added a test commit 911c64cd714364707e1258dcf627b151630a18bf Author: Keith Turner <ktur...@apache.org> Date: Fri May 27 14:38:28 2016 -0400 ACCUMULO-4165 Added a user facing API for RFile Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/61a7de4a Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/61a7de4a Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/61a7de4a Branch: refs/heads/master Commit: 61a7de4a167571bea16f2f130ddaaa8768562148 Parents: c8621bb Author: Keith Turner <ke...@deenlo.com> Authored: Thu Jun 2 11:03:37 2016 -0400 Committer: Keith Turner <ke...@deenlo.com> Committed: Thu Jun 2 11:03:37 2016 -0400 ---------------------------------------------------------------------- .../client/admin/NewTableConfiguration.java | 14 +- .../core/client/admin/TableOperations.java | 4 +- .../core/client/impl/OfflineIterator.java | 14 +- .../client/mapred/AccumuloFileOutputFormat.java | 25 +- .../mapreduce/AccumuloFileOutputFormat.java | 27 +- .../accumulo/core/client/rfile/FSConfArgs.java | 47 ++ .../accumulo/core/client/rfile/RFile.java | 275 ++++++++ .../core/client/rfile/RFileScanner.java | 330 ++++++++++ .../core/client/rfile/RFileScannerBuilder.java | 148 +++++ .../accumulo/core/client/rfile/RFileSource.java | 44 ++ .../accumulo/core/client/rfile/RFileWriter.java | 236 +++++++ .../core/client/rfile/RFileWriterBuilder.java | 148 +++++ .../accumulo/core/client/sample/Sampler.java | 1 - .../accumulo/core/file/FileOperations.java | 59 +- .../apache/accumulo/core/file/rfile/RFile.java | 2 +- .../core/file/rfile/RFileOperations.java | 45 +- .../accumulo/core/iterators/IteratorUtil.java | 15 + .../sample/impl/SamplerConfigurationImpl.java | 12 + .../accumulo/core/util/LocalityGroupUtil.java | 2 + .../accumulo/core/client/rfile/RFileTest.java | 647 +++++++++++++++++++ .../accumulo/core/file/rfile/RFileTest.java | 10 +- .../accumulo/tserver/tablet/ScanDataSource.java | 13 +- 22 files changed, 2012 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java index 994b653..e7dc898 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java @@ -88,7 +88,7 @@ public class NewTableConfiguration { */ public NewTableConfiguration setProperties(Map<String,String> prop) { checkArgument(prop != null, "properties is null"); - checkDisjoint(prop, samplerConfiguration); + SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration); this.properties = new HashMap<String,String>(prop); return this; @@ -114,16 +114,6 @@ public class NewTableConfiguration { return Collections.unmodifiableMap(propertyMap); } - private void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) { - if (props.isEmpty() || samplerConfiguration == null) { - return; - } - - Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap(); - - checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint"); - } - /** * Enable building a sample data set on the new table using the given sampler configuration. * @@ -131,7 +121,7 @@ public class NewTableConfiguration { */ public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfiguration) { requireNonNull(samplerConfiguration); - checkDisjoint(properties, samplerConfiguration); + SamplerConfigurationImpl.checkDisjoint(properties, samplerConfiguration); this.samplerConfiguration = samplerConfiguration; return this; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java index f292902..3e56736 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java @@ -31,6 +31,8 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; +import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; @@ -538,7 +540,7 @@ public interface TableOperations { Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; /** - * Bulk import all the files in a directory into a table. + * Bulk import all the files in a directory into a table. Files can be created using {@link AccumuloFileOutputFormat} and {@link RFile#newWriter()} * * @param tableName * the name of the table http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java index c5017c3..54fe4ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java @@ -52,11 +52,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; -import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; -import org.apache.accumulo.core.iterators.system.DeletingIterator; import org.apache.accumulo.core.iterators.system.MultiIterator; -import org.apache.accumulo.core.iterators.system.VisibilityFilter; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; @@ -359,18 +355,12 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> { OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations, acuTableConf, false, samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration()); - DeletingIterator delIter = new DeletingIterator(multiIter, false); - - ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); - - ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns)); - byte[] defaultSecurityLabel; - ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)); defaultSecurityLabel = cv.getExpression(); - VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel); + SortedKeyValueIterator<Key,Value> visFilter = IteratorUtil.setupSystemScanIterators(multiIter, new HashSet<Column>(options.fetchedColumns), authorizations, + defaultSecurityLabel); return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList, options.serverSideIteratorOptions, iterEnv, false)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java index 1e90e27..f2bc4cd 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java @@ -17,20 +17,16 @@ package org.apache.accumulo.core.client.mapred; import java.io.IOException; -import java.util.Arrays; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.commons.collections.map.LRUMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -163,11 +159,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { final String extension = acuConf.get(Property.TABLE_FILE_TYPE); final Path file = new Path(getWorkOutputPath(job), getUniqueName(job, "part") + "." + extension); - - final LRUMap validVisibilities = new LRUMap(ConfiguratorBase.getVisibilityCacheSize(conf)); + final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf); return new RecordWriter<Key,Value>() { - FileSKVWriter out = null; + RFileWriter out = null; @Override public void close(Reporter reporter) throws IOException { @@ -177,17 +172,9 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { @Override public void write(Key key, Value value) throws IOException { - - Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData()); - if (wasChecked == null) { - byte[] cv = key.getColumnVisibilityData().toArray(); - new ColumnVisibility(cv); - validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); - } - if (out == null) { - out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf) - .build(); + out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)).withTableProperties(acuConf) + .withVisibilityCacheSize(visCacheSize).build(); out.startDefaultLocalityGroup(); } out.append(key, value); http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java index b337f56..75afe2b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java @@ -17,19 +17,16 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; -import java.util.Arrays; +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.commons.collections.map.LRUMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; @@ -161,11 +158,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { final String extension = acuConf.get(Property.TABLE_FILE_TYPE); final Path file = this.getDefaultWorkFile(context, "." + extension); - - final LRUMap validVisibilities = new LRUMap(1000); + final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf); return new RecordWriter<Key,Value>() { - FileSKVWriter out = null; + RFileWriter out = null; @Override public void close(TaskAttemptContext context) throws IOException { @@ -175,22 +171,13 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> { @Override public void write(Key key, Value value) throws IOException { - - Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData()); - if (wasChecked == null) { - byte[] cv = key.getColumnVisibilityData().toArray(); - new ColumnVisibility(cv); - validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); - } - if (out == null) { - out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf) - .build(); + out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)).withTableProperties(acuConf) + .withVisibilityCacheSize(visCacheSize).build(); out.startDefaultLocalityGroup(); } out.append(key, value); } }; } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java new file mode 100644 index 0000000..1679e43 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +class FSConfArgs { + + FileSystem fs; + Configuration conf; + + FileSystem getFileSystem() throws IOException { + if (fs == null) { + fs = FileSystem.get(getConf()); + } + return fs; + } + + Configuration getConf() throws IOException { + if (fs != null) { + return fs.getConf(); + } + + if (conf == null) { + conf = new Configuration(); + } + return conf; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java new file mode 100644 index 0000000..bc5995e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; + +/** + * RFile is Accumulo's internal storage format for Key Value pairs. This class is a Factory that enables creating a {@link Scanner} for reading and a + * {@link RFileWriter} for writing Rfiles. + * + * <p> + * The {@link Scanner} created by this class makes it easy to experiment with real data from a live system on a developers workstation. Also the {@link Scanner} + * can be used to write tools to analyze Accumulo's raw data. + * + * @since 1.8.0 + */ +public class RFile { + + /** + * This is an intermediate interface in a larger builder pattern. Supports setting the required input sources for reading a RFile. + * + * @since 1.8.0 + */ + public static interface InputArguments { + /** + * Specify RFiles to read from. When multiple inputs are specified the {@link Scanner} constructed will present a merged view. + * + * @param inputs + * one or more RFiles to read. + * @return this + */ + ScannerOptions from(RFileSource... inputs); + + /** + * Specify RFiles to read from. When multiple are specified the {@link Scanner} constructed will present a merged view. + * + * @param files + * one or more RFiles to read. + * @return this + */ + ScannerFSOptions from(String... files); + } + + /** + * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to read RFile(s) from. + * + * @since 1.8.0 + */ + public static interface ScannerFSOptions extends ScannerOptions { + /** + * Optionally provide a FileSystem to open RFiles. If not specified, the FileSystem will be constructed using configuration on the classpath. + * + * @param fs + * use this FileSystem to open files. + * @return this + */ + ScannerOptions withFileSystem(FileSystem fs); + } + + /** + * This is an intermediate interface in a larger builder pattern. Supports setting optional parameters for reading RFile(s) and building a scanner over + * RFile(s). + * + * @since 1.8.0 + */ + public static interface ScannerOptions { + + /** + * By default the {@link Scanner} created will setup the default Accumulo system iterators. The iterators do things like the following : + * + * <ul> + * <li>Suppress deleted data</li> + * <li>Filter based on @link {@link Authorizations}</li> + * <li>Filter columns specified by functions like {@link Scanner#fetchColumn(Text, Text)} and {@link Scanner#fetchColumnFamily(Text)}</li> + * </ul> + * + * <p> + * Calling this method will turn off these system iterators and allow reading the raw data in an RFile. When reading the raw data, delete data and delete + * markers may be seen. Delete markers are {@link Key}s with the delete flag set. + * + * <p> + * Disabling system iterators will cause {@link #withAuthorizations(Authorizations)}, {@link Scanner#fetchColumn(Text, Text)}, and + * {@link Scanner#fetchColumnFamily(Text)} to throw runtime exceptions. + * + * @return this + */ + public ScannerOptions withoutSystemIterators(); + + /** + * The authorizations passed here will be used to filter Keys, from the {@link Scanner}, based on the content of the column visibility field. + * + * @param auths + * scan with these authorizations + * @return this + */ + public ScannerOptions withAuthorizations(Authorizations auths); + + /** + * Enabling this option will cache RFiles data in memory. This option is useful when doing lots of random accesses. + * + * @param cacheSize + * the size of the data cache in bytes. + * @return this + */ + public ScannerOptions withDataCache(long cacheSize); + + /** + * Enabling this option will cache RFiles indexes in memory. Index data within a RFile is used to find data when seeking to a {@link Key}. This option is + * useful when doing lots of random accesses. + * + * @param cacheSize + * the size of the index cache in bytes. + * @return this + */ + public ScannerOptions withIndexCache(long cacheSize); + + /** + * This option allows limiting the {@link Scanner} from reading data outside of a given range. A scanner will not see any data outside of this range even if + * the RFile(s) have data outside the range. + * + * @return this + */ + public ScannerOptions withBounds(Range range); + + /** + * Construct the {@link Scanner} with iterators specified in a tables properties. Properties for a table can be obtained by calling + * {@link TableOperations#getProperties(String)} + * + * @param props + * iterable over Accumulo table key value properties. + * @return this + */ + public ScannerOptions withTableProperties(Iterable<Entry<String,String>> props); + + /** + * @see #withTableProperties(Iterable) + * @param props + * a map instead of an Iterable + * @return this + */ + public ScannerOptions withTableProperties(Map<String,String> props); + + /** + * @return a Scanner over RFile using the specified options. + */ + public Scanner build(); + } + + /** + * Entry point for building a new {@link Scanner} over one or more RFiles. + */ + public static InputArguments newScanner() { + return new RFileScannerBuilder(); + } + + /** + * This is an intermediate interface in a larger builder pattern. Supports setting the required output sink to write a RFile to. + * + * @since 1.8.0 + */ + public static interface OutputArguments { + /** + * @param filename + * name of file to write RFile data + * @return this + */ + public WriterFSOptions to(String filename); + + /** + * @param out + * output stream to write RFile data + * @return this + */ + public WriterOptions to(OutputStream out); + } + + /** + * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to write to. + * + * @since 1.8.0 + */ + public static interface WriterFSOptions extends WriterOptions { + /** + * Optionally provide a FileSystem to open a file to write a RFile. If not specified, the FileSystem will be constructed using configuration on the + * classpath. + * + * @param fs + * use this FileSystem to open files. + * @return this + */ + WriterOptions withFileSystem(FileSystem fs); + } + + /** + * This is an intermediate interface in a larger builder pattern. Supports setting optional parameters for creating a RFile and building a RFileWriter. + * + * @since 1.8.0 + */ + public static interface WriterOptions { + /** + * An option to store sample data in the generated RFile. + * + * @param samplerConf + * configuration to use when generating sample data. + * @throws IllegalArgumentException + * if table properties were previously specified and the table properties also specify a sampler. + * @return this + */ + public WriterOptions withSampler(SamplerConfiguration samplerConf); + + /** + * Create an RFile using the same configuration as an Accumulo table. Properties for a table can be obtained by calling + * {@link TableOperations#getProperties(String)} + * + * @param props + * iterable over Accumulo table key value properties. + * @throws IllegalArgumentException + * if sampler was previously specified and the table properties also specify a sampler. + * @return this + */ + public WriterOptions withTableProperties(Iterable<Entry<String,String>> props); + + /** + * @see #withTableProperties(Iterable) + */ + public WriterOptions withTableProperties(Map<String,String> props); + + /** + * @param maxSize + * As keys are added to an RFile the visibility field is validated. Validating the visibility field requires parsing it. In order to make + * validation faster, previously seen visibilities are cached. This option allows setting the maximum size of this cache. + * @return this + */ + public WriterOptions withVisibilityCacheSize(int maxSize); + + /** + * @return a new RfileWriter created with the options previously specified. + */ + public RFileWriter build() throws IOException; + } + + /** + * Entry point for creating a new RFile writer. + */ + public static OutputArguments newWriter() { + return new RFileWriterBuilder(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java new file mode 100644 index 0000000..4dfba68 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment; +import org.apache.accumulo.core.client.impl.ScannerOptions; +import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.iterators.IteratorAdapter; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +class RFileScanner extends ScannerOptions implements Scanner { + + private static final byte[] EMPTY_BYTES = new byte[0]; + private static final Range EMPTY_RANGE = new Range(); + + private Range range; + private BlockCache dataCache = null; + private BlockCache indexCache = null; + private Opts opts; + private int batchSize = 1000; + private long readaheadThreshold = 3; + + private static final long CACHE_BLOCK_SIZE = AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE); + + static class Opts { + InputArgs in; + Authorizations auths = Authorizations.EMPTY; + long dataCacheSize; + long indexCacheSize; + boolean useSystemIterators = true; + public HashMap<String,String> tableConfig; + Range bounds; + } + + // This cache exist as a hack to avoid leaking decompressors. When the RFile code is not given a + // cache it reads blocks directly from the decompressor. However if a user does not read all data + // for a scan this can leave a BCFile block open and a decompressor allocated. + // + // By providing a cache to the RFile code it forces each block to be read into memory. When a + // block is accessed the entire thing is read into memory immediately allocating and deallocating + // a decompressor. If the user does not read all data, no decompressors are left allocated. + private static class NoopCache implements BlockCache { + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) { + return null; + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buf) { + return null; + } + + @Override + public CacheEntry getBlock(String blockName) { + return null; + } + + @Override + public long getMaxSize() { + return Integer.MAX_VALUE; + } + } + + RFileScanner(Opts opts) { + if (!opts.auths.equals(Authorizations.EMPTY) && !opts.useSystemIterators) { + throw new IllegalArgumentException("Set authorizations and specified not to use system iterators"); + } + + this.opts = opts; + if (opts.indexCacheSize > 0) { + this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE); + } else { + this.indexCache = new NoopCache(); + } + + if (opts.dataCacheSize > 0) { + this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE); + } else { + this.dataCache = new NoopCache(); + } + } + + @Override + public synchronized void fetchColumnFamily(Text col) { + Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators"); + super.fetchColumnFamily(col); + } + + @Override + public synchronized void fetchColumn(Text colFam, Text colQual) { + Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators"); + super.fetchColumn(colFam, colQual); + } + + @Override + public void fetchColumn(IteratorSetting.Column column) { + Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators"); + super.fetchColumn(column); + } + + @Override + public void setClassLoaderContext(String classLoaderContext) { + throw new UnsupportedOperationException(); + } + + @Deprecated + @Override + public void setTimeOut(int timeOut) { + if (timeOut == Integer.MAX_VALUE) + setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + else + setTimeout(timeOut, TimeUnit.SECONDS); + } + + @Deprecated + @Override + public int getTimeOut() { + long timeout = getTimeout(TimeUnit.SECONDS); + if (timeout >= Integer.MAX_VALUE) + return Integer.MAX_VALUE; + return (int) timeout; + } + + @Override + public void setRange(Range range) { + this.range = range; + } + + @Override + public Range getRange() { + return range; + } + + @Override + public void setBatchSize(int size) { + this.batchSize = size; + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override + public void enableIsolation() {} + + @Override + public void disableIsolation() {} + + @Override + public synchronized void setReadaheadThreshold(long batches) { + Preconditions.checkArgument(batches > 0); + readaheadThreshold = batches; + } + + @Override + public synchronized long getReadaheadThreshold() { + return readaheadThreshold; + } + + @Override + public Authorizations getAuthorizations() { + return opts.auths; + } + + @Override + public void addScanIterator(IteratorSetting cfg) { + super.addScanIterator(cfg); + } + + @Override + public void removeScanIterator(String iteratorName) { + super.removeScanIterator(iteratorName); + } + + @Override + public void updateScanIteratorOption(String iteratorName, String key, String value) { + super.updateScanIteratorOption(iteratorName, key, value); + } + + private class IterEnv extends BaseIteratorEnvironment { + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.scan; + } + + @Override + public boolean isFullMajorCompaction() { + return false; + } + + @Override + public Authorizations getAuthorizations() { + return opts.auths; + } + + @Override + public boolean isSamplingEnabled() { + return RFileScanner.this.getSamplerConfiguration() != null; + } + + @Override + public SamplerConfiguration getSamplerConfiguration() { + return RFileScanner.this.getSamplerConfiguration(); + } + } + + @Override + public Iterator<Entry<Key,Value>> iterator() { + try { + RFileSource[] sources = opts.in.getSources(); + List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length); + for (int i = 0; i < sources.length; i++) { + FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream(); + readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache, + AccumuloConfiguration.getDefaultConfiguration()))); + } + + if (getSamplerConfiguration() != null) { + for (int i = 0; i < readers.size(); i++) { + readers.set(i, ((Reader) readers.get(i)).getSample(new SamplerConfigurationImpl(getSamplerConfiguration()))); + } + } + + SortedKeyValueIterator<Key,Value> iterator; + if (opts.bounds != null) { + iterator = new MultiIterator(readers, opts.bounds); + } else { + iterator = new MultiIterator(readers, false); + } + + Set<ByteSequence> families = Collections.emptySet(); + + if (opts.useSystemIterators) { + SortedSet<Column> cols = this.getFetchedColumns(); + families = LocalityGroupUtil.families(cols); + iterator = IteratorUtil.setupSystemScanIterators(iterator, cols, getAuthorizations(), EMPTY_BYTES); + } + + try { + if (opts.tableConfig != null && opts.tableConfig.size() > 0) { + ConfigurationCopy conf = new ConfigurationCopy(opts.tableConfig); + iterator = IteratorUtil.loadIterators(IteratorScope.scan, iterator, null, conf, serverSideIteratorList, serverSideIteratorOptions, new IterEnv()); + } else { + iterator = IteratorUtil.loadIterators(iterator, serverSideIteratorList, serverSideIteratorOptions, new IterEnv(), false, null); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + iterator.seek(getRange() == null ? EMPTY_RANGE : getRange(), families, families.size() == 0 ? false : true); + return new IteratorAdapter(iterator); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + if (dataCache instanceof LruBlockCache) { + ((LruBlockCache) dataCache).shutdown(); + } + + if (indexCache instanceof LruBlockCache) { + ((LruBlockCache) indexCache).shutdown(); + } + + try { + for (RFileSource source : opts.in.getSources()) { + source.getInputStream().close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java new file mode 100644 index 0000000..92e07b4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.rfile.RFile.ScannerFSOptions; +import org.apache.accumulo.core.client.rfile.RFile.ScannerOptions; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOptions, RFile.ScannerOptions { + + static class InputArgs extends FSConfArgs { + private Path[] paths; + private RFileSource[] sources; + + InputArgs(String... files) { + this.paths = new Path[files.length]; + for (int i = 0; i < files.length; i++) { + this.paths[i] = new Path(files[i]); + } + } + + InputArgs(RFileSource... sources) { + this.sources = sources; + } + + RFileSource[] getSources() throws IOException { + if (sources == null) { + sources = new RFileSource[paths.length]; + for (int i = 0; i < paths.length; i++) { + sources[i] = new RFileSource(getFileSystem().open(paths[i]), getFileSystem().getFileStatus(paths[i]).getLen()); + } + } else { + for (int i = 0; i < sources.length; i++) { + if (!(sources[i].getInputStream() instanceof FSDataInputStream)) { + sources[i] = new RFileSource(new FSDataInputStream(sources[i].getInputStream()), sources[i].getLength()); + } + } + } + + return sources; + } + } + + private RFileScanner.Opts opts = new RFileScanner.Opts(); + + @Override + public ScannerOptions withoutSystemIterators() { + opts.useSystemIterators = false; + return this; + } + + @Override + public ScannerOptions withAuthorizations(Authorizations auths) { + Objects.requireNonNull(auths); + opts.auths = auths; + return this; + } + + @Override + public ScannerOptions withDataCache(long cacheSize) { + Preconditions.checkArgument(cacheSize > 0); + opts.dataCacheSize = cacheSize; + return this; + } + + @Override + public ScannerOptions withIndexCache(long cacheSize) { + Preconditions.checkArgument(cacheSize > 0); + opts.indexCacheSize = cacheSize; + return this; + } + + @Override + public Scanner build() { + return new RFileScanner(opts); + } + + @Override + public ScannerOptions withFileSystem(FileSystem fs) { + Objects.requireNonNull(fs); + opts.in.fs = fs; + return this; + } + + @Override + public ScannerOptions from(RFileSource... inputs) { + opts.in = new InputArgs(inputs); + return this; + } + + @Override + public ScannerFSOptions from(String... files) { + opts.in = new InputArgs(files); + return this; + } + + @Override + public ScannerOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) { + Objects.requireNonNull(tableConfig); + this.opts.tableConfig = new HashMap<>(); + for (Entry<String,String> entry : tableConfig) { + this.opts.tableConfig.put(entry.getKey(), entry.getValue()); + } + return this; + } + + @Override + public ScannerOptions withTableProperties(Map<String,String> tableConfig) { + Objects.requireNonNull(tableConfig); + this.opts.tableConfig = new HashMap<>(tableConfig); + return this; + } + + @Override + public ScannerOptions withBounds(Range range) { + Objects.requireNonNull(range); + this.opts.bounds = range; + return this; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java new file mode 100644 index 0000000..21298c3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.InputStream; + +/** + * RFile metadata is stored at the end of the file. Inorder to read an RFile, its length must be known. This provides a way to pass an InputStream and length + * for reading an RFile. + * + * @since 1.8.0 + */ +public class RFileSource { + private final InputStream in; + private final long len; + + public RFileSource(InputStream in, long len) { + this.in = in; + this.len = len; + } + + public InputStream getInputStream() { + return in; + } + + public long getLength() { + return len; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java new file mode 100644 index 0000000..aad4908 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.commons.collections.map.LRUMap; + +import com.google.common.base.Preconditions; + +//formatter was adding spaced that checkstyle did not like, so turned off formatter +//@formatter:off +/** + * This class provides an API for writing RFiles. It can be used to create file for bulk import into Accumulo using + * {@link TableOperations#importDirectory(String, String, String, boolean)} + * + * <p> + * A RFileWriter has the following constraints. Violating these contraints will result in runtime exceptions. + * + * <ul> + * <li>Before appending any keys, a locality group must be started by calling one of the startNewLocalityGroup functions or startDefaultLocalityGroup.</li> + * <li>Keys must be appended in sorted order within a locality group.</li> + * <li>Locality groups must have a mutually exclusive set of column families.</li> + * <li>The default locality group must be started last.</li> + * </ul> + * + * + * <p> + * Below is an example of using RFileWriter + * + * <p> + * + * <pre> + * {@code + * Iterable<Entry<Key, Value>> localityGroup1Data = ... + * Iterable<Entry<Key, Value>> localityGroup2Data = ... + * Iterable<Entry<Key, Value>> defaultGroupData = ... + * + * try(RFileWriter writer = RFile.newWriter().to(file).build()){ + * + * //Start a locality group before appending data. + * writer.startNewLocalityGroup("groupA", "columnFam1", "columnFam2"); + * //Append data to the locality group that was started above. Must append in sorted order. + * writer.append(localityGroup1Data); + * + * //Add another locality group. + * writer.startNewLocalityGroup("groupB", "columnFam3", "columnFam4"); + * writer.append(localityGroup2Data); + * + * //The default locality group must be started last. The column families for the default group do not need to be specified. + * writer.startDefaultLocalityGroup(); + * //Data appended here can not contain any column families specified in previous locality groups. + * writer.append(defaultGroupData); + * + * //This is a try-with-resources so the writer is closed here at the end of the code block. + * } + * } + * </pre> + * + * <p> + * Create instances by calling {@link RFile#newWriter()} + * + * @since 1.8.0 + */ +// @formatter:on +public class RFileWriter implements AutoCloseable { + + private FileSKVWriter writer; + private final LRUMap validVisibilities; + private boolean startedLG; + private boolean startedDefaultLG; + + RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) { + this.writer = fileSKVWriter; + this.validVisibilities = new LRUMap(visCacheSize); + } + + private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException { + Preconditions.checkState(!startedDefaultLG, "Cannont start a locality group after starting the default locality group"); + writer.startNewLocalityGroup(name, columnFamilies); + startedLG = true; + } + + /** + * Before appending any data, a locality group must be started. The default locality group must be started last. + * + * @param name + * locality group name, used for informational purposes + * @param families + * the column families the locality group can contain + * + * @throws IllegalStateException + * When default locality group already started. + */ + public void startNewLocalityGroup(String name, List<byte[]> families) throws IOException { + HashSet<ByteSequence> fams = new HashSet<ByteSequence>(); + for (byte[] family : families) { + fams.add(new ArrayByteSequence(family)); + } + _startNewLocalityGroup(name, fams); + } + + /** + * See have doc for {@link #startNewLocalityGroup(String, List)} + */ + public void startNewLocalityGroup(String name, byte[]... families) throws IOException { + startNewLocalityGroup(name, Arrays.asList(families)); + } + + /** + * See have doc for {@link #startNewLocalityGroup(String, List)}. + * + * @param families + * will be encoded using UTF-8 + * + * @throws IllegalStateException + * When default locality group already started. + */ + public void startNewLocalityGroup(String name, Set<String> families) throws IOException { + HashSet<ByteSequence> fams = new HashSet<ByteSequence>(); + for (String family : families) { + fams.add(new ArrayByteSequence(family)); + } + _startNewLocalityGroup(name, fams); + } + + /** + * See have doc for {@link #startNewLocalityGroup(String, List)}. + * + * @param families + * will be encoded using UTF-8 + * + * @throws IllegalStateException + * When default locality group already started. + */ + public void startNewLocalityGroup(String name, String... families) throws IOException { + HashSet<ByteSequence> fams = new HashSet<ByteSequence>(); + for (String family : families) { + fams.add(new ArrayByteSequence(family)); + } + _startNewLocalityGroup(name, fams); + } + + /** + * A locality group in which the column families do not need to specified. The locality group must be started after all other locality groups. Can not append + * column families that were in a previous locality group. + * + * @throws IllegalStateException + * When default locality group already started. + */ + + public void startDefaultLocalityGroup() throws IOException { + Preconditions.checkState(!startedDefaultLG); + writer.startDefaultLocalityGroup(); + startedDefaultLG = true; + startedLG = true; + } + + /** + * Append the key and value to the last locality group that was started. + * + * @param key + * This key must be greater than or equal to the last key appended. For non-default locality groups, the keys column family must be one of the column + * families specified when calling startNewLocalityGroup(). Must be non-null. + * @param val + * value to append, must be non-null. + * + * @throws IllegalArgumentException + * This is thrown when data is appended out of order OR when the key contains a invalid visibility OR when a column family is not valid for a + * locality group. + * @throws IllegalStateException + * Thrown when no locality group was started. + */ + public void append(Key key, Value val) throws IOException { + Preconditions.checkState(startedLG, "No locality group was started"); + Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData()); + if (wasChecked == null) { + byte[] cv = key.getColumnVisibilityData().toArray(); + new ColumnVisibility(cv); + validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); + } + writer.append(key, val); + } + + /** + * Append the keys and values to the last locality group that was started. + * + * @param keyValues + * The keys must be in sorted order. The first key returned by the iterable must be greater than or equal to the last key appended. For non-default + * locality groups, the keys column family must be one of the column families specified when calling startNewLocalityGroup(). Must be non-null. + * + * @throws IllegalArgumentException + * This is thrown when data is appended out of order OR when the key contains a invalid visibility OR when a column family is not valid for a + * locality group. + * @throws IllegalStateException + * When no locality group was started. + */ + public void append(Iterable<Entry<Key,Value>> keyValues) throws IOException { + Preconditions.checkState(startedLG, "No locality group was started"); + for (Entry<Key,Value> entry : keyValues) { + append(entry.getKey(), entry.getValue()); + } + } + + @Override + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java new file mode 100644 index 0000000..e4a141c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.client.rfile; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions; +import org.apache.accumulo.core.client.rfile.RFile.WriterOptions; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions, RFile.WriterOptions { + + private static class OutputArgs extends FSConfArgs { + private Path path; + private OutputStream out; + + OutputArgs(String filename) { + this.path = new Path(filename); + } + + OutputArgs(OutputStream out) { + this.out = out; + } + + OutputStream getOutputStream() { + return out; + } + } + + private OutputArgs out; + private SamplerConfiguration sampler = null; + private Map<String,String> tableConfig = Collections.emptyMap(); + private int visCacheSize = 1000; + + @Override + public WriterOptions withSampler(SamplerConfiguration samplerConf) { + Objects.requireNonNull(samplerConf); + SamplerConfigurationImpl.checkDisjoint(tableConfig, samplerConf); + this.sampler = samplerConf; + return this; + } + + @Override + public RFileWriter build() throws IOException { + FileOperations fileops = FileOperations.getInstance(); + AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration(); + HashMap<String,String> userProps = new HashMap<>(); + if (sampler != null) { + userProps.putAll(new SamplerConfigurationImpl(sampler).toTablePropertiesMap()); + } + userProps.putAll(tableConfig); + + if (userProps.size() > 0) { + acuconf = new ConfigurationCopy(Iterables.concat(acuconf, userProps.entrySet())); + } + + if (out.getOutputStream() != null) { + FSDataOutputStream fsdo; + if (out.getOutputStream() instanceof FSDataOutputStream) { + fsdo = (FSDataOutputStream) out.getOutputStream(); + } else { + fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo")); + } + return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf).build(), visCacheSize); + } else { + return new RFileWriter(fileops.newWriterBuilder().forFile(out.path.toString(), out.getFileSystem(), out.getConf()).withTableConfiguration(acuconf) + .build(), visCacheSize); + } + } + + @Override + public WriterOptions withFileSystem(FileSystem fs) { + Objects.requireNonNull(fs); + out.fs = fs; + return this; + } + + @Override + public WriterFSOptions to(String filename) { + Objects.requireNonNull(filename); + this.out = new OutputArgs(filename); + return this; + } + + @Override + public WriterOptions to(OutputStream out) { + Objects.requireNonNull(out); + this.out = new OutputArgs(out); + return this; + } + + @Override + public WriterOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) { + Objects.requireNonNull(tableConfig); + HashMap<String,String> cfg = new HashMap<>(); + for (Entry<String,String> entry : tableConfig) { + cfg.put(entry.getKey(), entry.getValue()); + } + + SamplerConfigurationImpl.checkDisjoint(cfg, sampler); + this.tableConfig = cfg; + return this; + } + + @Override + public WriterOptions withTableProperties(Map<String,String> tableConfig) { + Objects.requireNonNull(tableConfig); + return withTableProperties(tableConfig.entrySet()); + } + + @Override + public WriterOptions withVisibilityCacheSize(int maxSize) { + Preconditions.checkArgument(maxSize > 0); + this.visCacheSize = maxSize; + return this; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java b/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java index 03bd9d7..8b4db95 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java +++ b/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java @@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Key; * * @since 1.8.0 */ - public interface Sampler { /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 314bbae..4724bbe 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; public abstract class FileOperations { @@ -76,7 +77,7 @@ public abstract class FileOperations { * </pre> */ public NeedsFile<GetFileSizeOperationBuilder> getFileSize() { - return (NeedsFile<GetFileSizeOperationBuilder>) new GetFileSizeOperation(); + return new GetFileSizeOperation(); } /** @@ -92,8 +93,8 @@ public abstract class FileOperations { * .build(); * </pre> */ - public NeedsFile<OpenWriterOperationBuilder> newWriterBuilder() { - return (NeedsFile<OpenWriterOperationBuilder>) new OpenWriterOperation(); + public NeedsFileOrOuputStream<OpenWriterOperationBuilder> newWriterBuilder() { + return new OpenWriterOperation(); } /** @@ -110,7 +111,7 @@ public abstract class FileOperations { * </pre> */ public NeedsFile<OpenIndexOperationBuilder> newIndexReaderBuilder() { - return (NeedsFile<OpenIndexOperationBuilder>) new OpenIndexOperation(); + return new OpenIndexOperation(); } /** @@ -150,7 +151,7 @@ public abstract class FileOperations { * </pre> */ public NeedsFile<OpenReaderOperationBuilder> newReaderBuilder() { - return (NeedsFile<OpenReaderOperationBuilder>) new OpenReaderOperation(); + return new OpenReaderOperation(); } // @@ -203,6 +204,10 @@ public abstract class FileOperations { return (SubclassType) this; } + protected void setFilename(String filename) { + this.filename = filename; + } + public String getFilename() { return filename; } @@ -211,6 +216,10 @@ public abstract class FileOperations { return fs; } + protected void setConfiguration(Configuration fsConf) { + this.fsConf = fsConf; + } + public Configuration getConfiguration() { return fsConf; } @@ -239,6 +248,7 @@ public abstract class FileOperations { */ protected class GetFileSizeOperation extends FileAccessOperation<GetFileSizeOperation> implements GetFileSizeOperationBuilder { /** Return the size of the file. */ + @Override public long execute() throws IOException { validate(); return getFileSize(this); @@ -278,9 +288,20 @@ public abstract class FileOperations { /** * Operation object for constructing a writer. */ - protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder { + protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder, + NeedsFileOrOuputStream<OpenWriterOperationBuilder> { private String compression; + private FSDataOutputStream outputStream; + @Override + public NeedsTableConfiguration<OpenWriterOperationBuilder> forOutputStream(String extenstion, FSDataOutputStream outputStream, Configuration fsConf) { + this.outputStream = outputStream; + setConfiguration(fsConf); + setFilename("foo" + extenstion); + return this; + } + + @Override public OpenWriterOperation withCompression(String compression) { this.compression = compression; return this; @@ -290,6 +311,21 @@ public abstract class FileOperations { return compression; } + public FSDataOutputStream getOutputStream() { + return outputStream; + } + + @Override + protected void validate() { + if (outputStream == null) { + super.validate(); + } else { + Objects.requireNonNull(getConfiguration()); + Objects.requireNonNull(getTableConfiguration()); + } + } + + @Override public FileSKVWriter build() throws IOException { validate(); return openWriter(this); @@ -359,6 +395,7 @@ public abstract class FileOperations { * Operation object for opening an index. */ protected class OpenIndexOperation extends FileReaderOperation<OpenIndexOperation> implements OpenIndexOperationBuilder { + @Override public FileSKVIterator build() throws IOException { validate(); return openIndex(this); @@ -378,6 +415,7 @@ public abstract class FileOperations { private boolean inclusive; /** Set the range over which the constructed iterator will search. */ + @Override public OpenScanReaderOperation overRange(Range range, Set<ByteSequence> columnFamilies, boolean inclusive) { this.range = range; this.columnFamilies = columnFamilies; @@ -407,6 +445,7 @@ public abstract class FileOperations { } /** Execute the operation, constructing a scan iterator. */ + @Override public FileSKVIterator build() throws IOException { validate(); return openScanReader(this); @@ -427,11 +466,13 @@ public abstract class FileOperations { /** * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to {@code seekToBeginning(true)}. */ + @Override public OpenReaderOperation seekToBeginning() { return seekToBeginning(true); } /** If true, seek the constructed iterator to the beginning of its domain before returning. */ + @Override public OpenReaderOperation seekToBeginning(boolean seekToBeginning) { this.seekToBeginning = seekToBeginning; return this; @@ -442,6 +483,7 @@ public abstract class FileOperations { } /** Execute the operation, constructing the specified file reader. */ + @Override public FileSKVIterator build() throws IOException { validate(); return openReader(this); @@ -473,6 +515,11 @@ public abstract class FileOperations { public NeedsFileSystem<ReturnType> forFile(String filename); } + public static interface NeedsFileOrOuputStream<ReturnType> extends NeedsFile<ReturnType> { + /** Specify the file this operation should apply to. */ + public NeedsTableConfiguration<ReturnType> forOutputStream(String extenstion, FSDataOutputStream out, Configuration fsConf); + } + /** * Type wrapper to ensure that {@code inFileSystem(...)} is called before other methods. */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 981a2e6..f6269e7 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -412,7 +412,7 @@ public class RFile { public void append(Key key, Value value) throws IOException { if (key.compareTo(prevKey) < 0) { - throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey); + throw new IllegalArgumentException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey); } currentLocalityGroup.updateColumnCount(key); http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index 5d15973..96d31ce 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.file.streams.RateLimitedOutputStream; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -77,21 +78,8 @@ public class RFileOperations extends FileOperations { @Override protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException { - Configuration conf = options.getConfiguration(); - AccumuloConfiguration acuconf = options.getTableConfiguration(); - int hrep = conf.getInt("dfs.replication", -1); - int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION); - int rep = hrep; - if (trep > 0 && trep != hrep) { - rep = trep; - } - long hblock = conf.getLong("dfs.block.size", 1 << 26); - long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE); - long block = hblock; - if (tblock > 0) - block = tblock; - int bufferSize = conf.getInt("io.file.buffer.size", 4096); + AccumuloConfiguration acuconf = options.getTableConfiguration(); long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE); long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX); @@ -106,11 +94,32 @@ public class RFileOperations extends FileOperations { String compression = options.getCompression(); compression = compression == null ? options.getTableConfiguration().get(Property.TABLE_FILE_COMPRESSION_TYPE) : compression; - String file = options.getFilename(); - FileSystem fs = options.getFileSystem(); + FSDataOutputStream outputStream = options.getOutputStream(); + + Configuration conf = options.getConfiguration(); + + if (outputStream == null) { + int hrep = conf.getInt("dfs.replication", -1); + int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION); + int rep = hrep; + if (trep > 0 && trep != hrep) { + rep = trep; + } + long hblock = conf.getLong("dfs.block.size", 1 << 26); + long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE); + long block = hblock; + if (tblock > 0) + block = tblock; + int bufferSize = conf.getInt("io.file.buffer.size", 4096); + + String file = options.getFilename(); + FileSystem fs = options.getFileSystem(); + + outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block); + } - CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block), - options.getRateLimiter()), compression, conf, acuconf); + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression, conf, + acuconf); RFile.Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler); return writer; http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java index 97e4f5c..8188ba3 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java @@ -35,12 +35,19 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint; +import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; +import org.apache.accumulo.core.iterators.system.DeletingIterator; import org.apache.accumulo.core.iterators.system.SynchronizedIterator; +import org.apache.accumulo.core.iterators.system.VisibilityFilter; import org.apache.accumulo.core.iterators.user.VersioningIterator; +import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; @@ -384,4 +391,12 @@ public class IteratorUtil { } return toIteratorSettings(ic); } + + public static SortedKeyValueIterator<Key,Value> setupSystemScanIterators(SortedKeyValueIterator<Key,Value> source, Set<Column> cols, Authorizations auths, + byte[] defaultVisibility) throws IOException { + DeletingIterator delIter = new DeletingIterator(source, false); + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, cols); + return new VisibilityFilter(colFilter, auths, defaultVisibility); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java index f0bd528..8abf4e7 100644 --- a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java @@ -17,6 +17,8 @@ package org.apache.accumulo.core.sample.impl; +import static com.google.common.base.Preconditions.checkArgument; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -169,6 +171,16 @@ public class SamplerConfigurationImpl implements Writable { return className + " " + options; } + public static void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) { + if (props.isEmpty() || samplerConfiguration == null) { + return; + } + + Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap(); + + checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint"); + } + public static TSamplerConfiguration toThrift(SamplerConfiguration samplerConfig) { if (samplerConfig == null) return null; http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java index 07757a6..a5255c7 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java @@ -49,6 +49,8 @@ public class LocalityGroupUtil { public static final Set<ByteSequence> EMPTY_CF_SET = Collections.emptySet(); public static Set<ByteSequence> families(Collection<Column> columns) { + if (columns.size() == 0) + return EMPTY_CF_SET; Set<ByteSequence> result = new HashSet<ByteSequence>(columns.size()); for (Column col : columns) { result.add(new ArrayByteSequence(col.getColumnFamily()));