Repository: accumulo Updated Branches: refs/heads/master ac2ddc53b -> fae163686
ACCUMULO-2504 use the volume manager to get the source/dest filesystems Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bb8bdc06 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bb8bdc06 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bb8bdc06 Branch: refs/heads/master Commit: bb8bdc066699d62b90a5dcafa5b763ec2553384a Parents: 11d2dcf Author: Eric C. Newton <eric.new...@gmail.com> Authored: Fri Mar 21 10:23:40 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Fri Mar 21 10:23:40 2014 -0400 ---------------------------------------------------------------------- .../tserver/BulkFailedCopyProcessor.java | 21 +++--- .../accumulo/test/BulkImportVolumeIT.java | 75 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bb8bdc06/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index 6b31af1..39bf81e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java @@ -20,8 +20,9 @@ import java.io.IOException; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; import org.apache.hadoop.fs.FileSystem; @@ -51,18 +52,18 @@ public class BulkFailedCopyProcessor implements Processor { Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp"); try { - FileSystem fs = TraceFileSystem.wrap(VolumeConfiguration.getDefaultVolume(CachedConfiguration.getInstance(), - ServerConfiguration.getSiteConfiguration()).getFileSystem()); - - FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance()); - fs.rename(tmp, dest); + VolumeManager vm = VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration()); + FileSystem origFs = TraceFileSystem.wrap(vm.getVolumeByPath(orig).getFileSystem()); + FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem()); + + FileUtil.copy(origFs, orig, destFs, tmp, false, true, CachedConfiguration.getInstance()); + destFs.rename(tmp, dest); log.debug("copied " + orig + " to " + dest); } catch (IOException ex) { try { - FileSystem fs = TraceFileSystem.wrap(VolumeConfiguration.getDefaultVolume(CachedConfiguration.getInstance(), - ServerConfiguration.getSiteConfiguration()).getFileSystem()); - - fs.create(dest).close(); + VolumeManager vm = VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration()); + FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem()); + destFs.create(dest).close(); log.warn(" marked " + dest + " failed", ex); } catch (IOException e) { log.error("Unable to create failure flag file " + dest, e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/bb8bdc06/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java new file mode 100644 index 0000000..45a6655 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; + +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; + +// ACCUMULO-118/ACCUMULO-2504 +public class BulkImportVolumeIT extends ConfigurableMacIT { + + File volDirBase = null; + Path v1, v2; + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + File baseDir = cfg.getDir(); + volDirBase = new File(baseDir, "volumes"); + File v1f = new File(volDirBase, "v1"); + File v2f = new File(volDirBase, "v2"); + v1f.mkdir(); + v2f.mkdir(); + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + + // Run MAC on two locations in the local file system + cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString()); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test(timeout= 60 * 1000) + public void testBulkImportFailure() throws Exception { + String tableName = getTableNames(1)[0]; + TableOperations to = getConnector().tableOperations(); + to.create(tableName); + File bulk = new File(rootPath() + "/bulk"); + System.out.println("bulk: " + bulk); + assertTrue(bulk.mkdirs()); + File err = new File(rootPath() + "/err"); + assertTrue(err.mkdirs()); + File bogus = new File(bulk + "/bogus.rf"); + assertTrue(bogus.createNewFile()); + to.importDirectory(tableName, bulk.toString(), err.toString(), false); + assertEquals(1, err.list().length); + } + + +}