Repository: accumulo
Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 11d2dcf1a -> bb8bdc066


ACCUMULO-2504 use the volume manager to get the source/dest filesystems


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bb8bdc06
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bb8bdc06
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bb8bdc06

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: bb8bdc066699d62b90a5dcafa5b763ec2553384a
Parents: 11d2dcf
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Fri Mar 21 10:23:40 2014 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Fri Mar 21 10:23:40 2014 -0400

----------------------------------------------------------------------
 .../tserver/BulkFailedCopyProcessor.java        | 21 +++---
 .../accumulo/test/BulkImportVolumeIT.java       | 75 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/bb8bdc06/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
index 6b31af1..39bf81e 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
@@ -20,8 +20,9 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.volume.VolumeConfiguration;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,18 +52,18 @@ public class BulkFailedCopyProcessor implements Processor {
     Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
 
     try {
-      FileSystem fs = 
TraceFileSystem.wrap(VolumeConfiguration.getDefaultVolume(CachedConfiguration.getInstance(),
-          ServerConfiguration.getSiteConfiguration()).getFileSystem());
-
-      FileUtil.copy(fs, orig, fs, tmp, false, true, 
CachedConfiguration.getInstance());
-      fs.rename(tmp, dest);
+      VolumeManager vm = 
VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration());
+      FileSystem origFs = 
TraceFileSystem.wrap(vm.getVolumeByPath(orig).getFileSystem());
+      FileSystem destFs = 
TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem());
+      
+      FileUtil.copy(origFs, orig, destFs, tmp, false, true, 
CachedConfiguration.getInstance());
+      destFs.rename(tmp, dest);
       log.debug("copied " + orig + " to " + dest);
     } catch (IOException ex) {
       try {
-        FileSystem fs = 
TraceFileSystem.wrap(VolumeConfiguration.getDefaultVolume(CachedConfiguration.getInstance(),
-            ServerConfiguration.getSiteConfiguration()).getFileSystem());
-
-        fs.create(dest).close();
+        VolumeManager vm = 
VolumeManagerImpl.get(ServerConfiguration.getSiteConfiguration());
+        FileSystem destFs = 
TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem());
+        destFs.create(dest).close();
         log.warn(" marked " + dest + " failed", ex);
       } catch (IOException e) {
         log.error("Unable to create failure flag file " + dest, e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bb8bdc06/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java 
b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
new file mode 100644
index 0000000..45a6655
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+// ACCUMULO-118/ACCUMULO-2504
+public class BulkImportVolumeIT extends ConfigurableMacIT {
+
+  File volDirBase = null;
+  Path v1, v2;
+  
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    File baseDir = cfg.getDir();
+    volDirBase = new File(baseDir, "volumes");
+    File v1f = new File(volDirBase, "v1");
+    File v2f = new File(volDirBase, "v2");
+    v1f.mkdir();
+    v2f.mkdir();
+    v1 = new Path("file://" + v1f.getAbsolutePath());
+    v2 = new Path("file://" + v2f.getAbsolutePath());
+
+    // Run MAC on two locations in the local file system
+    cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + 
v2.toString());
+
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Test(timeout= 60 * 1000) 
+  public void testBulkImportFailure() throws Exception {
+    String tableName = getTableNames(1)[0];
+    TableOperations to = getConnector().tableOperations();
+    to.create(tableName);
+    File bulk = new File(rootPath() + "/bulk");
+    System.out.println("bulk: " + bulk);
+    assertTrue(bulk.mkdirs());
+    File err = new File(rootPath() + "/err");
+    assertTrue(err.mkdirs());
+    File bogus = new File(bulk + "/bogus.rf");
+    assertTrue(bogus.createNewFile());
+    to.importDirectory(tableName, bulk.toString(), err.toString(), false);
+    assertEquals(1, err.list().length);
+  }
+  
+
+}

Reply via email to