ACCUMULO-2041 more review updates
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/459d3048 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/459d3048 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/459d3048 Branch: refs/heads/ACCUMULO-378 Commit: 459d3048eb39650ebff2c93734d2886a9d4869c7 Parents: 8049859 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Mon Apr 21 16:28:01 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Jun 3 10:49:43 2014 -0400 ---------------------------------------------------------------------- .../accumulo/tserver/CountingIterator.java | 78 ---------- .../accumulo/tserver/tablet/Compactor.java | 11 +- .../tserver/tablet/CountingIterator.java | 78 ++++++++++ .../accumulo/tserver/tablet/MinorCompactor.java | 9 +- .../apache/accumulo/tserver/tablet/Tablet.java | 15 +- .../tserver/tablet/TabletCommitter.java | 3 + .../accumulo/tserver/CountingIteratorTest.java | 1 + .../apache/accumulo/tserver/RootFilesTest.java | 150 ------------------- .../accumulo/tserver/tablet/RootFilesTest.java | 150 +++++++++++++++++++ 9 files changed, 247 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/main/java/org/apache/accumulo/tserver/CountingIterator.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CountingIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CountingIterator.java deleted file mode 100644 index e4ba076..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/CountingIterator.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; - -public class CountingIterator extends WrappingIterator { - - private long count; - private final ArrayList<CountingIterator> deepCopies; - private final AtomicLong entriesRead; - - @Override - public CountingIterator deepCopy(IteratorEnvironment env) { - return new CountingIterator(this, env); - } - - private CountingIterator(CountingIterator other, IteratorEnvironment env) { - setSource(other.getSource().deepCopy(env)); - count = 0; - this.deepCopies = other.deepCopies; - this.entriesRead = other.entriesRead; - deepCopies.add(this); - } - - public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) { - deepCopies = new ArrayList<CountingIterator>(); - this.setSource(source); - count = 0; - this.entriesRead = entriesRead; - } - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) { - throw new UnsupportedOperationException(); - } - - @Override - public void next() throws IOException { - super.next(); - count++; - if (count % 1024 == 0) { - entriesRead.addAndGet(1024); - } - } - - public long getCount() { - long sum = 0; - for (CountingIterator dc : deepCopies) { - sum += dc.count; - } - - return count + sum; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java index 9a93be3..2eee5ea 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java @@ -58,7 +58,6 @@ import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.tserver.CountingIterator; import org.apache.accumulo.tserver.InMemoryMap; import org.apache.accumulo.tserver.MinorCompactionReason; import org.apache.accumulo.tserver.TabletIteratorEnvironment; @@ -134,15 +133,15 @@ public class Compactor implements Callable<CompactionStats> { return compactions; } - public Compactor(VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, - AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, int reason) { - this.extent = extent; - this.fs = fs; + public Compactor(Tablet tablet, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes, + CompactionEnv env, List<IteratorSetting> iterators, int reason, AccumuloConfiguration tableConfiguation) { + this.extent = tablet.getExtent(); + this.fs = tablet.getTabletServer().getFileSystem(); + this.acuTableConf = tableConfiguation; this.filesToCompact = files; this.imm = imm; this.outputFile = outputFile; this.propogateDeletes = propogateDeletes; - this.acuTableConf = acuTableConf; this.env = env; this.iterators = iterators; this.reason = reason; http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CountingIterator.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CountingIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CountingIterator.java new file mode 100644 index 0000000..44b8460 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CountingIterator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; + +public class CountingIterator extends WrappingIterator { + + private long count; + private final ArrayList<CountingIterator> deepCopies; + private final AtomicLong entriesRead; + + @Override + public CountingIterator deepCopy(IteratorEnvironment env) { + return new CountingIterator(this, env); + } + + private CountingIterator(CountingIterator other, IteratorEnvironment env) { + setSource(other.getSource().deepCopy(env)); + count = 0; + this.deepCopies = other.deepCopies; + this.entriesRead = other.entriesRead; + deepCopies.add(this); + } + + public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) { + deepCopies = new ArrayList<CountingIterator>(); + this.setSource(source); + count = 0; + this.entriesRead = entriesRead; + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public void next() throws IOException { + super.next(); + count++; + if (count % 1024 == 0) { + entriesRead.addAndGet(1024); + } + } + + public long getCount() { + long sum = 0; + for (CountingIterator dc : deepCopies) { + sum += dc.count; + } + + return count + sum; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java index 6636159..115aed7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@ -23,7 +23,6 @@ import java.util.Random; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.metadata.schema.DataFileValue; @@ -31,7 +30,6 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; @@ -53,9 +51,8 @@ public class MinorCompactor extends Compactor { return Collections.singletonMap(mergeFile, dfv); } - public MinorCompactor(VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf, - KeyExtent extent, MinorCompactionReason mincReason) { - super(fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() { + public MinorCompactor(Tablet tablet, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, MinorCompactionReason mincReason, TableConfiguration tableConfig) { + super(tablet, toFileMap(mergeFile, dfv), imm, outputFile, true, new CompactionEnv() { @Override public boolean isCompactionEnabled() { @@ -66,7 +63,7 @@ public class MinorCompactor extends Compactor { public IteratorScope getIteratorScope() { return IteratorScope.minc; } - }, Collections.<IteratorSetting>emptyList(), mincReason.ordinal()); + }, Collections.<IteratorSetting>emptyList(), mincReason.ordinal(), tableConfig); } private boolean isTableDeleting() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index bf9a905..dc2fc4d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -856,7 +856,6 @@ public class Tablet implements TabletCommitter { ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated); return new Scanner(this, range, opts); } - DataFileValue minorCompact(VolumeManager fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef mergeFile, boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { boolean failed = false; @@ -875,7 +874,7 @@ public class Tablet implements TabletCommitter { if (mergeFile != null) dfv = getDatafileManager().getDatafileSizes().get(mergeFile); - MinorCompactor compactor = new MinorCompactor(fs, memTable, mergeFile, dfv, tmpDatafile, tableConfiguration, extent, mincReason); + MinorCompactor compactor = new MinorCompactor(this, memTable, mergeFile, dfv, tmpDatafile, mincReason, tableConfiguration); stats = compactor.call(); } finally { span.stop(); @@ -888,13 +887,13 @@ public class Tablet implements TabletCommitter { span.stop(); } return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()); - } catch (Exception E) { + } catch (Exception e) { failed = true; - throw new RuntimeException(E); - } catch (Error E) { + throw new RuntimeException(e); + } catch (Error e) { // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction failed = true; - throw new RuntimeException(E); + throw new RuntimeException(e); } finally { try { getTabletMemory().finalizeMinC(); @@ -1850,8 +1849,8 @@ public class Tablet implements TabletCommitter { // always propagate deletes, unless last batch boolean lastBatch = filesToCompact.isEmpty(); - Compactor compactor = new Compactor(getTabletServer().getFileSystem(), copy, null, compactTmpName, lastBatch ? propogateDeletes : true, tableConf, extent, cenv, - compactionIterators, reason.ordinal()); + Compactor compactor = new Compactor(this, copy, null, compactTmpName, lastBatch ? propogateDeletes : true, cenv, + compactionIterators, reason.ordinal(), tableConf); CompactionStats mcs = compactor.call(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java index bd87a5b..a5d197c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java @@ -25,6 +25,9 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.tserver.InMemoryMap; import org.apache.accumulo.tserver.log.DfsLogger; +/* + * A partial interface of Tablet to allow for testing of CommitSession without needing a real Tablet. + */ public interface TabletCommitter { void abortCommit(CommitSession commitSession, List<Mutation> value); http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java index 302b025..154b121 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedMapIterator; +import org.apache.accumulo.tserver.tablet.CountingIterator; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java deleted file mode 100644 index 7cfe65c..0000000 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.server.fs.FileRef; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.accumulo.tserver.tablet.RootFiles; -import org.apache.hadoop.fs.Path; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -/** - * - */ -public class RootFilesTest { - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); - - private class TestWrapper { - File rootTabletDir; - Set<FileRef> oldDatafiles; - String compactName; - FileRef tmpDatafile; - FileRef newDatafile; - VolumeManager vm; - AccumuloConfiguration conf; - - TestWrapper(VolumeManager vm, AccumuloConfiguration conf, String compactName, String... inputFiles) throws IOException { - this.vm = vm; - this.conf = conf; - - rootTabletDir = new File(tempFolder.newFolder(), "accumulo/tables/+r/root_tablet"); - rootTabletDir.mkdirs(); - oldDatafiles = new HashSet<FileRef>(); - for (String filename : inputFiles) { - File file = new File(rootTabletDir, filename); - file.createNewFile(); - oldDatafiles.add(new FileRef(file.toURI().toString())); - } - - this.compactName = compactName; - - File tmpFile = new File(rootTabletDir, compactName + "_tmp"); - tmpFile.createNewFile(); - tmpDatafile = new FileRef(tmpFile.toURI().toString()); - - newDatafile = new FileRef(new File(rootTabletDir, compactName).toURI().toString()); - } - - void prepareReplacement() throws IOException { - RootFiles.prepareReplacement(vm, new Path(rootTabletDir.toURI()), oldDatafiles, compactName); - } - - void renameReplacement() throws IOException { - RootFiles.renameReplacement(vm, tmpDatafile, newDatafile); - } - - public void finishReplacement() throws IOException { - RootFiles.finishReplacement(conf, vm, new Path(rootTabletDir.toURI()), oldDatafiles, compactName); - } - - public Collection<String> cleanupReplacement(String... expectedFiles) throws IOException { - Collection<String> ret = RootFiles.cleanupReplacement(vm, vm.listStatus(new Path(rootTabletDir.toURI())), true); - - HashSet<String> expected = new HashSet<String>(); - for (String efile : expectedFiles) - expected.add(new File(rootTabletDir, efile).toURI().toString()); - - Assert.assertEquals(expected, new HashSet<String>(ret)); - - return ret; - } - - public void assertFiles(String... files) { - HashSet<String> actual = new HashSet<String>(); - for (File file : rootTabletDir.listFiles()) { - actual.add(file.getName()); - } - - HashSet<String> expected = new HashSet<String>(); - expected.addAll(Arrays.asList(files)); - - Assert.assertEquals(expected, actual); - } - } - - @SuppressWarnings("deprecation") - @Test - public void testFileReplacement() throws IOException { - - ConfigurationCopy conf = new ConfigurationCopy(); - conf.set(Property.INSTANCE_DFS_URI, "file:///"); - conf.set(Property.INSTANCE_DFS_DIR, "/"); - - VolumeManager vm = VolumeManagerImpl.get(conf); - - TestWrapper wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); - wrapper.prepareReplacement(); - wrapper.renameReplacement(); - wrapper.finishReplacement(); - wrapper.assertFiles("A00004.rf"); - - wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); - wrapper.prepareReplacement(); - wrapper.cleanupReplacement("A00002.rf", "F00003.rf"); - wrapper.assertFiles("A00002.rf", "F00003.rf"); - - wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); - wrapper.prepareReplacement(); - wrapper.renameReplacement(); - wrapper.cleanupReplacement("A00004.rf"); - wrapper.assertFiles("A00004.rf"); - - wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); - wrapper.prepareReplacement(); - wrapper.renameReplacement(); - wrapper.finishReplacement(); - wrapper.cleanupReplacement("A00004.rf"); - wrapper.assertFiles("A00004.rf"); - - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/459d3048/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/RootFilesTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/RootFilesTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/RootFilesTest.java new file mode 100644 index 0000000..9c75a66 --- /dev/null +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/RootFilesTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.tserver.tablet.RootFiles; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * + */ +public class RootFilesTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + private class TestWrapper { + File rootTabletDir; + Set<FileRef> oldDatafiles; + String compactName; + FileRef tmpDatafile; + FileRef newDatafile; + VolumeManager vm; + AccumuloConfiguration conf; + + TestWrapper(VolumeManager vm, AccumuloConfiguration conf, String compactName, String... inputFiles) throws IOException { + this.vm = vm; + this.conf = conf; + + rootTabletDir = new File(tempFolder.newFolder(), "accumulo/tables/+r/root_tablet"); + rootTabletDir.mkdirs(); + oldDatafiles = new HashSet<FileRef>(); + for (String filename : inputFiles) { + File file = new File(rootTabletDir, filename); + file.createNewFile(); + oldDatafiles.add(new FileRef(file.toURI().toString())); + } + + this.compactName = compactName; + + File tmpFile = new File(rootTabletDir, compactName + "_tmp"); + tmpFile.createNewFile(); + tmpDatafile = new FileRef(tmpFile.toURI().toString()); + + newDatafile = new FileRef(new File(rootTabletDir, compactName).toURI().toString()); + } + + void prepareReplacement() throws IOException { + RootFiles.prepareReplacement(vm, new Path(rootTabletDir.toURI()), oldDatafiles, compactName); + } + + void renameReplacement() throws IOException { + RootFiles.renameReplacement(vm, tmpDatafile, newDatafile); + } + + public void finishReplacement() throws IOException { + RootFiles.finishReplacement(conf, vm, new Path(rootTabletDir.toURI()), oldDatafiles, compactName); + } + + public Collection<String> cleanupReplacement(String... expectedFiles) throws IOException { + Collection<String> ret = RootFiles.cleanupReplacement(vm, vm.listStatus(new Path(rootTabletDir.toURI())), true); + + HashSet<String> expected = new HashSet<String>(); + for (String efile : expectedFiles) + expected.add(new File(rootTabletDir, efile).toURI().toString()); + + Assert.assertEquals(expected, new HashSet<String>(ret)); + + return ret; + } + + public void assertFiles(String... files) { + HashSet<String> actual = new HashSet<String>(); + for (File file : rootTabletDir.listFiles()) { + actual.add(file.getName()); + } + + HashSet<String> expected = new HashSet<String>(); + expected.addAll(Arrays.asList(files)); + + Assert.assertEquals(expected, actual); + } + } + + @SuppressWarnings("deprecation") + @Test + public void testFileReplacement() throws IOException { + + ConfigurationCopy conf = new ConfigurationCopy(); + conf.set(Property.INSTANCE_DFS_URI, "file:///"); + conf.set(Property.INSTANCE_DFS_DIR, "/"); + + VolumeManager vm = VolumeManagerImpl.get(conf); + + TestWrapper wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); + wrapper.prepareReplacement(); + wrapper.renameReplacement(); + wrapper.finishReplacement(); + wrapper.assertFiles("A00004.rf"); + + wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); + wrapper.prepareReplacement(); + wrapper.cleanupReplacement("A00002.rf", "F00003.rf"); + wrapper.assertFiles("A00002.rf", "F00003.rf"); + + wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); + wrapper.prepareReplacement(); + wrapper.renameReplacement(); + wrapper.cleanupReplacement("A00004.rf"); + wrapper.assertFiles("A00004.rf"); + + wrapper = new TestWrapper(vm, conf, "A00004.rf", "A00002.rf", "F00003.rf"); + wrapper.prepareReplacement(); + wrapper.renameReplacement(); + wrapper.finishReplacement(); + wrapper.cleanupReplacement("A00004.rf"); + wrapper.assertFiles("A00004.rf"); + + } +}