ACCUMULO-3759 Fix Java 8 compiler warnings * Add missing hashCode in class with equals * Enforce one-type per file
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e2e6780 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e2e6780 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e2e6780 Branch: refs/heads/1.7 Commit: 6e2e6780fc59c86112fba30a5211081bb6e77979 Parents: f996387 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Tue Apr 28 20:30:22 2015 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Tue Apr 28 20:30:22 2015 -0400 ---------------------------------------------------------------------- .../core/client/impl/OfflineIterator.java | 340 ++++++++++++ .../core/client/impl/OfflineScanner.java | 314 ----------- .../core/compaction/CompactionSettings.java | 42 -- .../accumulo/core/compaction/PatternType.java | 28 + .../accumulo/core/compaction/SizeType.java | 30 ++ .../accumulo/core/compaction/StringType.java | 24 + .../apache/accumulo/core/compaction/Type.java | 21 + .../accumulo/core/compaction/UIntType.java | 27 + .../core/file/DispatchingFileFactory.java | 136 +++++ .../accumulo/core/file/FileOperations.java | 106 ---- .../accumulo/core/cli/TestClientOpts.java | 5 + .../client/CountingVerifyingReceiver.java | 64 +++ .../simple/client/RandomBatchScanner.java | 38 -- pom.xml | 1 + .../accumulo/master/tableOps/BulkImport.java | 363 ------------- .../master/tableOps/CancelCompactions.java | 23 - .../accumulo/master/tableOps/ChooseDir.java | 53 ++ .../accumulo/master/tableOps/CleanUp.java | 287 ++++++++++ .../master/tableOps/CleanUpBulkImport.java | 64 +++ .../accumulo/master/tableOps/CloneInfo.java | 36 ++ .../accumulo/master/tableOps/CloneMetadata.java | 54 ++ .../master/tableOps/ClonePermissions.java | 73 +++ .../accumulo/master/tableOps/CloneTable.java | 195 ------- .../master/tableOps/CloneZookeeper.java | 76 +++ .../accumulo/master/tableOps/CompactRange.java | 159 ------ .../master/tableOps/CompactionDriver.java | 188 +++++++ .../master/tableOps/CompleteBulkImport.java | 45 ++ .../accumulo/master/tableOps/CopyFailed.java | 158 ++++++ .../accumulo/master/tableOps/CreateDir.java | 51 ++ .../master/tableOps/CreateImportDir.java | 61 +++ .../master/tableOps/CreateNamespace.java | 137 ----- .../accumulo/master/tableOps/CreateTable.java | 251 --------- .../master/tableOps/DeleteNamespace.java | 55 -- .../accumulo/master/tableOps/DeleteTable.java | 265 ---------- .../accumulo/master/tableOps/ExportInfo.java | 29 ++ .../accumulo/master/tableOps/ExportTable.java | 257 --------- .../master/tableOps/FinishCancelCompaction.java | 40 ++ .../master/tableOps/FinishCloneTable.java | 64 +++ .../master/tableOps/FinishCreateNamespace.java | 58 +++ .../master/tableOps/FinishCreateTable.java | 62 +++ .../master/tableOps/FinishImportTable.java | 68 +++ .../tableOps/ImportPopulateZookeeper.java | 104 ++++ .../master/tableOps/ImportSetupPermissions.java | 65 +++ .../accumulo/master/tableOps/ImportTable.java | 521 ------------------- .../master/tableOps/ImportedTableInfo.java | 31 ++ .../accumulo/master/tableOps/LoadFiles.java | 209 ++++++++ .../master/tableOps/MapImportFileNames.java | 111 ++++ .../master/tableOps/MoveExportedFiles.java | 71 +++ .../master/tableOps/NamespaceCleanUp.java | 75 +++ .../accumulo/master/tableOps/NamespaceInfo.java | 31 ++ .../master/tableOps/PopulateMetadata.java | 54 ++ .../master/tableOps/PopulateMetadataTable.java | 217 ++++++++ .../master/tableOps/PopulateZookeeper.java | 77 +++ .../PopulateZookeeperWithNamespace.java | 74 +++ .../tableOps/SetupNamespacePermissions.java | 55 ++ .../master/tableOps/SetupPermissions.java | 63 +++ .../accumulo/master/tableOps/TableInfo.java | 35 ++ .../accumulo/master/tableOps/TableRangeOp.java | 45 -- .../master/tableOps/TableRangeOpWait.java | 69 +++ .../master/tableOps/WriteExportFiles.java | 268 ++++++++++ .../apache/accumulo/tserver/InMemoryMap.java | 119 ----- .../accumulo/tserver/MemKeyComparator.java | 44 ++ .../tserver/MemKeyConversionIterator.java | 96 ++++ .../PartialMutationSkippingIterator.java | 54 ++ .../accumulo/test/EstimateInMemMapOverhead.java | 317 ----------- .../test/InMemoryMapMemoryUsageTest.java | 102 ++++ .../accumulo/test/IntObjectMemoryUsageTest.java | 65 +++ .../apache/accumulo/test/MemoryUsageTest.java | 64 +++ .../accumulo/test/MutationMemoryUsageTest.java | 98 ++++ .../accumulo/test/TextMemoryUsageTest.java | 82 +++ .../accumulo/test/continuous/HistData.java | 49 ++ .../accumulo/test/continuous/Histogram.java | 30 -- 72 files changed, 4406 insertions(+), 3237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java new file mode 100644 index 0000000..b035e3e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyValue; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; +import org.apache.accumulo.core.iterators.system.DeletingIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.iterators.system.VisibilityFilter; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.volume.VolumeConfiguration; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; + +class OfflineIterator implements Iterator<Entry<Key,Value>> { + + static class OfflineIteratorEnvironment implements IteratorEnvironment { + + private final Authorizations authorizations; + + public OfflineIteratorEnvironment(Authorizations auths) { + this.authorizations = auths; + } + + @Override + public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException { + throw new NotImplementedException(); + } + + @Override + public AccumuloConfiguration getConfig() { + return AccumuloConfiguration.getDefaultConfiguration(); + } + + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.scan; + } + + @Override + public boolean isFullMajorCompaction() { + return false; + } + + private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>(); + + @Override + public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) { + topLevelIterators.add(iter); + } + + @Override + public Authorizations getAuthorizations() { + return authorizations; + } + + SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) { + if (topLevelIterators.isEmpty()) + return iter; + ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators); + allIters.add(iter); + return new MultiIterator(allIters, false); + } + } + + private SortedKeyValueIterator<Key,Value> iter; + private Range range; + private KeyExtent currentExtent; + private Connector conn; + private String tableId; + private Authorizations authorizations; + private Instance instance; + private ScannerOptions options; + private ArrayList<SortedKeyValueIterator<Key,Value>> readers; + private AccumuloConfiguration config; + + public OfflineIterator(ScannerOptions options, Instance instance, Credentials credentials, Authorizations authorizations, Text table, Range range) { + this.options = new ScannerOptions(options); + this.instance = instance; + this.range = range; + + if (this.options.fetchedColumns.size() > 0) { + this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last()); + } + + this.tableId = table.toString(); + this.authorizations = authorizations; + this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>(); + + try { + conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken()); + config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration()); + nextTablet(); + + while (iter != null && !iter.hasTop()) + nextTablet(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return iter != null && iter.hasTop(); + } + + @Override + public Entry<Key,Value> next() { + try { + byte[] v = iter.getTopValue().get(); + // copy just like tablet server does, do this before calling next + KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length)); + + iter.next(); + + while (iter != null && !iter.hasTop()) + nextTablet(); + + return ret; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void nextTablet() throws TableNotFoundException, AccumuloException, IOException { + + Range nextRange = null; + + if (currentExtent == null) { + Text startRow; + + if (range.getStartKey() != null) + startRow = range.getStartKey().getRow(); + else + startRow = new Text(); + + nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); + } else { + + if (currentExtent.getEndRow() == null) { + iter = null; + return; + } + + if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) { + iter = null; + return; + } + + nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false); + } + + List<String> relFiles = new ArrayList<String>(); + + Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles); + + while (eloc.getSecond() != null) { + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + Tables.clearCache(instance); + if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { + throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst()); + } + } + + UtilWaitThread.sleep(250); + + eloc = getTabletFiles(nextRange, relFiles); + } + + KeyExtent extent = eloc.getFirst(); + + if (!extent.getTableId().toString().equals(tableId)) { + throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent); + } + + if (currentExtent != null && !extent.isPreviousExtent(currentExtent)) + throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent); + + // Old property is only used to resolve relative paths into absolute paths. For systems upgraded + // with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the configuration + @SuppressWarnings("deprecation") + String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR; + + List<String> absFiles = new ArrayList<String>(); + for (String relPath : relFiles) { + if (relPath.contains(":")) { + absFiles.add(relPath); + } else { + // handle old-style relative paths + if (relPath.startsWith("..")) { + absFiles.add(tablesDir + relPath.substring(2)); + } else { + absFiles.add(tablesDir + "/" + tableId + relPath); + } + } + } + + iter = createIterator(extent, absFiles); + iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true); + currentExtent = extent; + + } + + private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException { + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setBatchSize(100); + scanner.setRange(nextRange); + + RowIterator rowIter = new RowIterator(scanner); + Iterator<Entry<Key,Value>> row = rowIter.next(); + + KeyExtent extent = null; + String location = null; + + while (row.hasNext()) { + Entry<Key,Value> entry = row.next(); + Key key = entry.getKey(); + + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + relFiles.add(key.getColumnQualifier().toString()); + } + + if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME) + || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) { + location = entry.getValue().toString(); + } + + if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + extent = new KeyExtent(key.getRow(), entry.getValue()); + } + + } + return new Pair<KeyExtent,String>(extent, location); + } + + private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException, + IOException { + + // TODO share code w/ tablet - ACCUMULO-1303 + AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId); + + Configuration conf = CachedConfiguration.getInstance(); + + for (SortedKeyValueIterator<Key,Value> reader : readers) { + ((FileSKVIterator) reader).close(); + } + + readers.clear(); + + // TODO need to close files - ACCUMULO-1303 + for (String file : absFiles) { + FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem(); + FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null); + readers.add(reader); + } + + MultiIterator multiIter = new MultiIterator(readers, extent); + + OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations); + + DeletingIterator delIter = new DeletingIterator(multiIter, false); + + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns)); + + byte[] defaultSecurityLabel; + + ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)); + defaultSecurityLabel = cv.getExpression(); + + VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel); + + return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList, + options.serverSideIteratorOptions, iterEnv, false)); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java index 2f31319..427a7cc 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java @@ -18,332 +18,18 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map.Entry; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyValue; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.IteratorUtil; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; -import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; -import org.apache.accumulo.core.iterators.system.DeletingIterator; -import org.apache.accumulo.core.iterators.system.MultiIterator; -import org.apache.accumulo.core.iterators.system.VisibilityFilter; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.util.LocalityGroupUtil; -import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.core.volume.VolumeConfiguration; -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; -class OfflineIterator implements Iterator<Entry<Key,Value>> { - - static class OfflineIteratorEnvironment implements IteratorEnvironment { - - private final Authorizations authorizations; - - public OfflineIteratorEnvironment(Authorizations auths) { - this.authorizations = auths; - } - - @Override - public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String mapFileName) throws IOException { - throw new NotImplementedException(); - } - - @Override - public AccumuloConfiguration getConfig() { - return AccumuloConfiguration.getDefaultConfiguration(); - } - - @Override - public IteratorScope getIteratorScope() { - return IteratorScope.scan; - } - - @Override - public boolean isFullMajorCompaction() { - return false; - } - - private ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>(); - - @Override - public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) { - topLevelIterators.add(iter); - } - - @Override - public Authorizations getAuthorizations() { - return authorizations; - } - - SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) { - if (topLevelIterators.isEmpty()) - return iter; - ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators); - allIters.add(iter); - return new MultiIterator(allIters, false); - } - } - - private SortedKeyValueIterator<Key,Value> iter; - private Range range; - private KeyExtent currentExtent; - private Connector conn; - private String tableId; - private Authorizations authorizations; - private Instance instance; - private ScannerOptions options; - private ArrayList<SortedKeyValueIterator<Key,Value>> readers; - private AccumuloConfiguration config; - - public OfflineIterator(ScannerOptions options, Instance instance, Credentials credentials, Authorizations authorizations, Text table, Range range) { - this.options = new ScannerOptions(options); - this.instance = instance; - this.range = range; - - if (this.options.fetchedColumns.size() > 0) { - this.range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last()); - } - - this.tableId = table.toString(); - this.authorizations = authorizations; - this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>(); - - try { - conn = instance.getConnector(credentials.getPrincipal(), credentials.getToken()); - config = new ConfigurationCopy(conn.instanceOperations().getSiteConfiguration()); - nextTablet(); - - while (iter != null && !iter.hasTop()) - nextTablet(); - - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean hasNext() { - return iter != null && iter.hasTop(); - } - - @Override - public Entry<Key,Value> next() { - try { - byte[] v = iter.getTopValue().get(); - // copy just like tablet server does, do this before calling next - KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length)); - - iter.next(); - - while (iter != null && !iter.hasTop()) - nextTablet(); - - return ret; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void nextTablet() throws TableNotFoundException, AccumuloException, IOException { - - Range nextRange = null; - - if (currentExtent == null) { - Text startRow; - - if (range.getStartKey() != null) - startRow = range.getStartKey().getRow(); - else - startRow = new Text(); - - nextRange = new Range(new KeyExtent(new Text(tableId), startRow, null).getMetadataEntry(), true, null, false); - } else { - - if (currentExtent.getEndRow() == null) { - iter = null; - return; - } - - if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW))) { - iter = null; - return; - } - - nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false); - } - - List<String> relFiles = new ArrayList<String>(); - - Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles); - - while (eloc.getSecond() != null) { - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - Tables.clearCache(instance); - if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) { - throw new AccumuloException("Table is online " + tableId + " cannot scan tablet in offline mode " + eloc.getFirst()); - } - } - - UtilWaitThread.sleep(250); - - eloc = getTabletFiles(nextRange, relFiles); - } - - KeyExtent extent = eloc.getFirst(); - - if (!extent.getTableId().toString().equals(tableId)) { - throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent); - } - - if (currentExtent != null && !extent.isPreviousExtent(currentExtent)) - throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent); - - // Old property is only used to resolve relative paths into absolute paths. For systems upgraded - // with relative paths, it's assumed that correct instance.dfs.{uri,dir} is still correct in the configuration - @SuppressWarnings("deprecation") - String tablesDir = config.get(Property.INSTANCE_DFS_DIR) + Constants.HDFS_TABLES_DIR; - - List<String> absFiles = new ArrayList<String>(); - for (String relPath : relFiles) { - if (relPath.contains(":")) { - absFiles.add(relPath); - } else { - // handle old-style relative paths - if (relPath.startsWith("..")) { - absFiles.add(tablesDir + relPath.substring(2)); - } else { - absFiles.add(tablesDir + "/" + tableId + relPath); - } - } - } - - iter = createIterator(extent, absFiles); - iter.seek(range, LocalityGroupUtil.families(options.fetchedColumns), options.fetchedColumns.size() == 0 ? false : true); - currentExtent = extent; - - } - - private Pair<KeyExtent,String> getTabletFiles(Range nextRange, List<String> relFiles) throws TableNotFoundException { - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - scanner.setBatchSize(100); - scanner.setRange(nextRange); - - RowIterator rowIter = new RowIterator(scanner); - Iterator<Entry<Key,Value>> row = rowIter.next(); - - KeyExtent extent = null; - String location = null; - - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - relFiles.add(key.getColumnQualifier().toString()); - } - - if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME) - || key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) { - location = entry.getValue().toString(); - } - - if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - extent = new KeyExtent(key.getRow(), entry.getValue()); - } - - } - return new Pair<KeyExtent,String>(extent, location); - } - - private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent, List<String> absFiles) throws TableNotFoundException, AccumuloException, - IOException { - - // TODO share code w/ tablet - ACCUMULO-1303 - AccumuloConfiguration acuTableConf = AccumuloConfiguration.getTableConfiguration(conn, tableId); - - Configuration conf = CachedConfiguration.getInstance(); - - for (SortedKeyValueIterator<Key,Value> reader : readers) { - ((FileSKVIterator) reader).close(); - } - - readers.clear(); - - // TODO need to close files - ACCUMULO-1303 - for (String file : absFiles) { - FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem(); - FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null); - readers.add(reader); - } - - MultiIterator multiIter = new MultiIterator(readers, extent); - - OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations); - - DeletingIterator delIter = new DeletingIterator(multiIter, false); - - ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); - - ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns)); - - byte[] defaultSecurityLabel; - - ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)); - defaultSecurityLabel = cv.getExpression(); - - VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel); - - return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList, - options.serverSideIteratorOptions, iterEnv, false)); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - -} - -/** - * - */ public class OfflineScanner extends ScannerOptions implements Scanner { private int batchSize; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java index a45a692..43f8c0f 100644 --- a/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java +++ b/core/src/main/java/org/apache/accumulo/core/compaction/CompactionSettings.java @@ -18,48 +18,6 @@ package org.apache.accumulo.core.compaction; import java.util.Map; -import java.util.regex.Pattern; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; - -import com.google.common.base.Preconditions; - -interface Type { - String convert(String str); -} - -class SizeType implements Type { - @Override - public String convert(String str) { - long size = AccumuloConfiguration.getMemoryInBytes(str); - Preconditions.checkArgument(size > 0); - return Long.toString(size); - } -} - -class PatternType implements Type { - @Override - public String convert(String str) { - // ensure it compiles - Pattern.compile(str); - return str; - } -} - -class UIntType implements Type { - @Override - public String convert(String str) { - Preconditions.checkArgument(Integer.parseInt(str) > 0); - return str; - } -} - -class StringType implements Type { - @Override - public String convert(String str) { - return str; - } -} public enum CompactionSettings { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java b/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java new file mode 100644 index 0000000..c52dcb4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/PatternType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.compaction; + +import java.util.regex.Pattern; + +class PatternType implements Type { + @Override + public String convert(String str) { + // ensure it compiles + Pattern.compile(str); + return str; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java b/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java new file mode 100644 index 0000000..c2af401 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/SizeType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.compaction; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; + +import com.google.common.base.Preconditions; + +class SizeType implements Type { + @Override + public String convert(String str) { + long size = AccumuloConfiguration.getMemoryInBytes(str); + Preconditions.checkArgument(size > 0); + return Long.toString(size); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java b/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java new file mode 100644 index 0000000..7098a5c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/StringType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.compaction; + +class StringType implements Type { + @Override + public String convert(String str) { + return str; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/Type.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/Type.java b/core/src/main/java/org/apache/accumulo/core/compaction/Type.java new file mode 100644 index 0000000..d8f81a6 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/Type.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.compaction; + +interface Type { + String convert(String str); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java b/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java new file mode 100644 index 0000000..c8880fc --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/UIntType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.compaction; + +import com.google.common.base.Preconditions; + +class UIntType implements Type { + @Override + public String convert(String str) { + Preconditions.checkArgument(Integer.parseInt(str) > 0); + return str; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java new file mode 100644 index 0000000..128a931 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file; + +import java.io.IOException; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.map.MapFileOperations; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +class DispatchingFileFactory extends FileOperations { + + private FileOperations findFileFactory(String file) { + + Path p = new Path(file); + String name = p.getName(); + + if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) { + return new MapFileOperations(); + } + String[] sp = name.split("\\."); + + if (sp.length < 2) { + throw new IllegalArgumentException("File name " + name + " has no extension"); + } + + String extension = sp[sp.length - 1]; + + if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) { + return new MapFileOperations(); + } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) { + return new RFileOperations(); + } else { + throw new IllegalArgumentException("File type " + extension + " not supported"); + } + } + + @Override + public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null); + } + + @Override + public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null); + if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { + return new BloomFilterLayer.Reader(iter, acuconf); + } + return iter; + } + + @Override + public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf); + if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { + return new BloomFilterLayer.Writer(writer, acuconf); + } + return writer; + } + + @Override + public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { + return findFileFactory(file).getFileSize(file, fs, conf, acuconf); + } + + @Override + public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, + AccumuloConfiguration tableConf) throws IOException { + return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null); + } + + @Override + public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, + AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException { + + if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) + indexCache = null; + if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) + dataCache = null; + + return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache); + } + + @Override + public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, + BlockCache dataCache, BlockCache indexCache) throws IOException { + + if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) + indexCache = null; + if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) + dataCache = null; + + FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache); + if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { + return new BloomFilterLayer.Reader(iter, acuconf); + } + return iter; + } + + @Override + public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache) + throws IOException { + + if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) + iCache = null; + if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) + dCache = null; + + return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 78d0407..3798453 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -27,115 +27,9 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; -import org.apache.accumulo.core.file.map.MapFileOperations; import org.apache.accumulo.core.file.rfile.RFile; -import org.apache.accumulo.core.file.rfile.RFileOperations; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -class DispatchingFileFactory extends FileOperations { - - private FileOperations findFileFactory(String file) { - - Path p = new Path(file); - String name = p.getName(); - - if (name.startsWith(Constants.MAPFILE_EXTENSION + "_")) { - return new MapFileOperations(); - } - String[] sp = name.split("\\."); - - if (sp.length < 2) { - throw new IllegalArgumentException("File name " + name + " has no extension"); - } - - String extension = sp[sp.length - 1]; - - if (extension.equals(Constants.MAPFILE_EXTENSION) || extension.equals(Constants.MAPFILE_EXTENSION + "_tmp")) { - return new MapFileOperations(); - } else if (extension.equals(RFile.EXTENSION) || extension.equals(RFile.EXTENSION + "_tmp")) { - return new RFileOperations(); - } else { - throw new IllegalArgumentException("File type " + extension + " not supported"); - } - } - - @Override - public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null); - } - - @Override - public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null); - if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { - return new BloomFilterLayer.Reader(iter, acuconf); - } - return iter; - } - - @Override - public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf); - if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { - return new BloomFilterLayer.Writer(writer, acuconf); - } - return writer; - } - - @Override - public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException { - return findFileFactory(file).getFileSize(file, fs, conf, acuconf); - } - - @Override - public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, - AccumuloConfiguration tableConf) throws IOException { - return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null); - } - - @Override - public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf, - AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException { - - if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) - indexCache = null; - if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) - dataCache = null; - - return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache); - } - - @Override - public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, - BlockCache dataCache, BlockCache indexCache) throws IOException { - - if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) - indexCache = null; - if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) - dataCache = null; - - FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache); - if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) { - return new BloomFilterLayer.Reader(iter, acuconf); - } - return iter; - } - - @Override - public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache) - throws IOException { - - if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) - iCache = null; - if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) - dCache = null; - - return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache); - } - -} public abstract class FileOperations { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java index f0fdcca..65df5c9 100644 --- a/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java +++ b/core/src/test/java/org/apache/accumulo/core/cli/TestClientOpts.java @@ -263,5 +263,10 @@ public class TestClientOpts { public boolean equals(Object o) { return o instanceof EmptyToken; } + + @Override + public int hashCode() { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java new file mode 100644 index 0000000..873f886 --- /dev/null +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/CountingVerifyingReceiver.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.examples.simple.client; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Internal class used to verify validity of data read. + */ +class CountingVerifyingReceiver { + private static final Logger log = LoggerFactory.getLogger(CountingVerifyingReceiver.class); + + long count = 0; + int expectedValueSize = 0; + HashMap<Text,Boolean> expectedRows; + + CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) { + this.expectedRows = expectedRows; + this.expectedValueSize = expectedValueSize; + } + + public void receive(Key key, Value value) { + + String row = key.getRow().toString(); + long rowid = Integer.parseInt(row.split("_")[1]); + + byte expectedValue[] = RandomBatchWriter.createValue(rowid, expectedValueSize); + + if (!Arrays.equals(expectedValue, value.get())) { + log.error("Got unexpected value for " + key + " expected : " + new String(expectedValue, UTF_8) + " got : " + new String(value.get(), UTF_8)); + } + + if (!expectedRows.containsKey(key.getRow())) { + log.error("Got unexpected key " + key); + } else { + expectedRows.put(key.getRow(), true); + } + + count++; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java index 6f8b485..a43b97d 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchScanner.java @@ -16,10 +16,8 @@ */ package org.apache.accumulo.examples.simple.client; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.examples.simple.client.RandomBatchWriter.abs; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; @@ -43,42 +41,6 @@ import org.slf4j.LoggerFactory; import com.beust.jcommander.Parameter; /** - * Internal class used to verify validity of data read. - */ -class CountingVerifyingReceiver { - private static final Logger log = LoggerFactory.getLogger(CountingVerifyingReceiver.class); - - long count = 0; - int expectedValueSize = 0; - HashMap<Text,Boolean> expectedRows; - - CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) { - this.expectedRows = expectedRows; - this.expectedValueSize = expectedValueSize; - } - - public void receive(Key key, Value value) { - - String row = key.getRow().toString(); - long rowid = Integer.parseInt(row.split("_")[1]); - - byte expectedValue[] = RandomBatchWriter.createValue(rowid, expectedValueSize); - - if (!Arrays.equals(expectedValue, value.get())) { - log.error("Got unexpected value for " + key + " expected : " + new String(expectedValue, UTF_8) + " got : " + new String(value.get(), UTF_8)); - } - - if (!expectedRows.containsKey(key.getRow())) { - log.error("Got unexpected key " + key); - } else { - expectedRows.put(key.getRow(), true); - } - - count++; - } -} - -/** * Simple example for reading random batches of data from Accumulo. See docs/examples/README.batch for instructions. */ public class RandomBatchScanner { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0bcc689..f680f84 100644 --- a/pom.xml +++ b/pom.xml @@ -946,6 +946,7 @@ <property name="eachLine" value="true" /> </module> <module name="TreeWalker"> + <module name="OneTopLevelClass" /> <module name="RegexpSinglelineJava"> <property name="format" value="\s+$" /> <property name="message" value="Line has trailing whitespace." /> http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index 7f83988..031a80c 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@ -16,71 +16,34 @@ */ package org.apache.accumulo.master.tableOps; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IsolatedScanner; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.impl.ServerClient; import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.client.impl.thrift.ClientService; -import org.apache.accumulo.core.client.impl.thrift.ClientService.Client; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; -import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.tablets.UniqueNameAllocator; import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; -import org.apache.hadoop.io.Text; -import org.apache.htrace.wrappers.TraceExecutorService; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -302,329 +265,3 @@ public class BulkImport extends MasterRepo { Utils.getReadLock(tableId, tid).unlock(); } } - -class CleanUpBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static final Logger log = LoggerFactory.getLogger(CleanUpBulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String error; - - public CleanUpBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - log.debug("removing the bulk processing flag file in " + bulk); - Path bulkDir = new Path(bulk); - MetadataTableUtil.removeBulkLoadInProgressFlag(master, "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(master, tableId, bulkDir.toString()); - log.debug("removing the metadata table markers for loaded files"); - Connector conn = master.getConnector(); - MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid); - log.debug("releasing HDFS reservations for " + source + " and " + error); - Utils.unreserveHdfsDirectory(source, tid); - Utils.unreserveHdfsDirectory(error, tid); - Utils.getReadLock(tableId, tid).unlock(); - log.debug("completing bulk import transaction " + tid); - ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); - return null; - } -} - -class CompleteBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CompleteBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); - return new CopyFailed(tableId, source, bulk, error); - } -} - -class CopyFailed extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CopyFailed(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - Set<TServerInstance> finished = new HashSet<TServerInstance>(); - Set<TServerInstance> running = master.onlineTabletServers(); - for (TServerInstance server : running) { - try { - TServerConnection client = master.getConnection(server); - if (client != null && !client.isActive(tid)) - finished.add(server); - } catch (TException ex) { - log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); - } - } - if (finished.containsAll(running)) - return 0; - return 500; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - // This needs to execute after the arbiter is stopped - - VolumeManager fs = master.getFileSystem(); - - if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT))) - return new CleanUpBulkImport(tableId, source, bulk, error); - - HashMap<FileRef,String> failures = new HashMap<FileRef,String>(); - HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>(); - - try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(error, BulkImport.FAILURES_TXT)), UTF_8))) { - String line = null; - while ((line = in.readLine()) != null) { - Path path = new Path(line); - if (!fs.exists(new Path(error, path.getName()))) - failures.put(new FileRef(line, path), line); - } - } - - /* - * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that - * have no loaded markers. - */ - - // determine which failed files were loaded - Connector conn = master.getConnector(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - - for (Entry<Key,Value> entry : mscanner) { - if (Long.parseLong(entry.getValue().toString()) == tid) { - FileRef loadedFile = new FileRef(fs, entry.getKey()); - String absPath = failures.remove(loadedFile); - if (absPath != null) { - loadedFailures.put(loadedFile, absPath); - } - } - } - - // move failed files that were not loaded - for (String failure : failures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - fs.rename(orig, dest); - log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed"); - } - - if (loadedFailures.size() > 0) { - DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + master.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ, - master.getConfiguration()); - - HashSet<String> workIds = new HashSet<String>(); - - for (String failure : loadedFailures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - - if (fs.exists(dest)) - continue; - - bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8)); - workIds.add(orig.getName()); - log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); - } - - bifCopyQueue.waitUntilDone(workIds); - } - - fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT)); - return new CleanUpBulkImport(tableId, source, bulk, error); - } - -} - -class LoadFiles extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static ExecutorService threadPool = null; - private static final Logger log = LoggerFactory.getLogger(BulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String errorDir; - private boolean setTime; - - public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.errorDir = errorDir; - this.setTime = setTime; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - if (master.onlineTabletServers().size() == 0) - return 500; - return 0; - } - - private static synchronized ExecutorService getThreadPool(Master master) { - if (threadPool == null) { - int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); - pool.allowCoreThreadTimeOut(true); - threadPool = new TraceExecutorService(pool); - } - return threadPool; - } - - @Override - public Repo<Master> call(final long tid, final Master master) throws Exception { - ExecutorService executor = getThreadPool(master); - final AccumuloConfiguration conf = master.getConfiguration(); - VolumeManager fs = master.getFileSystem(); - List<FileStatus> files = new ArrayList<FileStatus>(); - for (FileStatus entry : fs.listStatus(new Path(bulk))) { - files.add(entry); - } - log.debug("tid " + tid + " importing " + files.size() + " files"); - - Path writable = new Path(this.errorDir, ".iswritable"); - if (!fs.createNewFile(writable)) { - // Maybe this is a re-try... clear the flag and try again - fs.delete(writable); - if (!fs.createNewFile(writable)) - throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, - "Unable to write to " + this.errorDir); - } - fs.delete(writable); - - final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); - for (FileStatus f : files) - filesToLoad.add(f.getPath().toString()); - - final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); - for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { - List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); - - if (master.onlineTabletServers().size() == 0) - log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); - - while (master.onlineTabletServers().size() == 0) { - UtilWaitThread.sleep(500); - } - - // Use the threadpool to assign files one-at-a-time to the server - final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); - for (final String file : filesToLoad) { - results.add(executor.submit(new Callable<List<String>>() { - @Override - public List<String> call() { - List<String> failures = new ArrayList<String>(); - ClientService.Client client = null; - String server = null; - try { - // get a connection to a random tablet server, do not prefer cached connections because - // this is running on the master and there are lots of connections to tablet servers - // serving the metadata tablets - long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); - client = pair.getSecond(); - server = pair.getFirst(); - List<String> attempt = Collections.singletonList(file); - log.debug("Asking " + pair.getFirst() + " to bulk import " + file); - List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime); - if (fail.isEmpty()) { - loaded.add(file); - } else { - failures.addAll(fail); - } - } catch (Exception ex) { - log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); - } finally { - ServerClient.close(client); - } - return failures; - } - })); - } - Set<String> failures = new HashSet<String>(); - for (Future<List<String>> f : results) - failures.addAll(f.get()); - filesToLoad.removeAll(loaded); - if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); - UtilWaitThread.sleep(100); - } - } - - FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); - try { - for (String f : filesToLoad) { - out.write(f); - out.write("\n"); - } - } finally { - out.close(); - } - - // return the next step, which will perform cleanup - return new CompleteBulkImport(tableId, source, bulk, errorDir); - } - - static String sampleList(Collection<?> potentiallyLongList, int max) { - StringBuffer result = new StringBuffer(); - result.append("["); - int i = 0; - for (Object obj : potentiallyLongList) { - result.append(obj); - if (i >= max) { - result.append("..."); - break; - } else { - result.append(", "); - } - i++; - } - if (i < max) - result.delete(result.length() - 2, result.length()); - result.append("]"); - return result.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java index 4f4b27e..e268f17 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java @@ -27,29 +27,6 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -class FinishCancelCompaction extends MasterRepo { - private static final long serialVersionUID = 1L; - private String tableId; - - public FinishCancelCompaction(String tableId) { - this.tableId = tableId; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - Utils.getReadLock(tableId, tid).unlock(); - return null; - } - - @Override - public void undo(long tid, Master environment) throws Exception { - - } -} - -/** - * - */ public class CancelCompactions extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java new file mode 100644 index 0000000..3e1aa33 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ChooseDir.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.ServerConstants; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Optional; + +class ChooseDir extends MasterRepo { + private static final long serialVersionUID = 1L; + + private TableInfo tableInfo; + + ChooseDir(TableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here + tableInfo.dir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION; + return new CreateDir(tableInfo); + } + + @Override + public void undo(long tid, Master master) throws Exception { + + } +} \ No newline at end of file