Author: ecn
Date: Wed Oct 10 20:15:56 2012
New Revision: 1396772

URL: http://svn.apache.org/viewvc?rev=1396772&view=rev
Log:
ACCUMULO-752 patched in Ed Kohlwey's implementation of importDirectory for 
MockAccumulo

Modified:
    
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
    
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
    
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
    
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
    
accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java

Modified: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java?rev=1396772&r1=1396771&r2=1396772&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
 (original)
+++ 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java
 Wed Oct 10 20:15:56 2012
@@ -29,12 +29,18 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.SystemPermission;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.hadoop.fs.FileSystem;
 
 @SuppressWarnings("deprecation")
 public class MockAccumulo {
   final Map<String,MockTable> tables = new HashMap<String,MockTable>();
   final Map<String,String> systemProperties = new HashMap<String,String>();
   Map<String,MockUser> users = new HashMap<String,MockUser>();
+  final FileSystem fs;
+  
+  MockAccumulo(FileSystem fs) {
+    this.fs = fs;
+  }
   
   {
     MockUser root = new MockUser("root", new byte[] {}, Constants.NO_AUTHS);
@@ -43,6 +49,10 @@ public class MockAccumulo {
     createTable("root", Constants.METADATA_TABLE_NAME, true, TimeType.LOGICAL);
   }
   
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+  
   void setProperty(String key, String value) {
     systemProperties.put(key, value);
   }

Modified: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java?rev=1396772&r1=1396771&r2=1396772&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
 (original)
+++ 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
 Wed Oct 10 20:15:56 2012
@@ -36,7 +36,7 @@ public class MockConnector extends Conne
   private final Instance instance;
   
   MockConnector(String username, Instance instance) {
-    this(username, new MockAccumulo(), instance);
+    this(username, new MockAccumulo(MockInstance.getDefaultFileSystem()), 
instance);
   }
   
   @SuppressWarnings("deprecation")

Modified: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java?rev=1396772&r1=1396771&r2=1396772&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
 (original)
+++ 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
 Wed Oct 10 20:15:56 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.client.mock;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,7 +31,9 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.TextUtil;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 
 public class MockInstance implements Instance {
@@ -41,16 +44,28 @@ public class MockInstance implements Ins
   String instanceName;
   
   public MockInstance() {
-    acu = new MockAccumulo();
+    acu = new MockAccumulo(getDefaultFileSystem());
     instanceName = "mock-instance";
   }
   
+  static FileSystem getDefaultFileSystem() {
+    try {
+      return FileSystem.get(CachedConfiguration.getInstance());
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
   public MockInstance(String instanceName) {
+    this(instanceName, getDefaultFileSystem());
+  }
+  
+  public MockInstance(String instanceName, FileSystem fs) {
     synchronized (instances) {
       if (instances.containsKey(instanceName))
         acu = instances.get(instanceName);
       else
-        instances.put(instanceName, acu = new MockAccumulo());
+        instances.put(instanceName, acu = new MockAccumulo(fs));
     }
     this.instanceName = instanceName;
   }

Modified: 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java?rev=1396772&r1=1396771&r2=1396772&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
 (original)
+++ 
accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
 Wed Oct 10 20:15:56 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.client.mock;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,11 +37,23 @@ import org.apache.accumulo.core.client.T
 import org.apache.accumulo.core.client.admin.FindMax;
 import org.apache.accumulo.core.client.admin.TableOperationsHelper;
 import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.iterators.conf.PerColumnIteratorConfig;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.BulkImportHelper.AssignmentStats;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
 @SuppressWarnings("deprecation")
@@ -178,7 +191,94 @@ public class MockTableOperations extends
   @Override
   public void importDirectory(String tableName, String dir, String failureDir, 
boolean setTime) throws IOException, AccumuloException,
       AccumuloSecurityException, TableNotFoundException {
-    throw new NotImplementedException();
+    long time = System.currentTimeMillis();
+    MockTable table = acu.tables.get(tableName);
+    if (table == null) {
+      throw new TableNotFoundException(null, tableName,
+          "The table was not found");
+    }
+    Path importPath = new Path(dir);
+    Path failurePath = new Path(failureDir);
+
+    Configuration conf = new Configuration();
+    FileSystem importFs = importPath.getFileSystem(conf);
+    FileSystem failureFs = importPath.getFileSystem(conf);
+    /*
+     * check preconditions
+     */
+    // directories are directories
+    if (importFs.isFile(importPath)) {
+      throw new IOException("Import path must be a directory.");
+    }
+    if (failureFs.isFile(failurePath)) {
+      throw new IOException("Failure path must be a directory.");
+    }
+    // failures are writable
+    Path createPath = failurePath.suffix("/.createFile");
+    FSDataOutputStream createStream = null;
+    try {
+      createStream = failureFs.create(createPath);
+    } catch (IOException e) {
+      throw new IOException("Error path is not writable.");
+    } finally {
+      if (createStream != null) {
+        createStream.close();
+      }
+    }
+    failureFs.delete(createPath, false);
+    // failures are empty
+    FileStatus[] failureChildStats = failureFs.listStatus(failurePath);
+    if (failureChildStats.length > 0) {
+      throw new IOException("Error path must be empty.");
+    }
+    /*
+     * Begin the import - iterate the files in the path
+     */
+    for (FileStatus importStatus : importFs.listStatus(importPath)) {
+      try {
+        FileSKVIterator importIterator = FileOperations.getInstance()
+            .openReader(importStatus.getPath().toString(), true, importFs,
+                conf, AccumuloConfiguration.getDefaultConfiguration());
+        while (importIterator.hasTop()) {
+          Key key = importIterator.getTopKey();
+          Value value = importIterator.getTopValue();
+          if (setTime) {
+            key.setTimestamp(time);
+          }
+          Mutation mutation = new Mutation(key.getRow());
+          if (!key.isDeleted()) {
+            mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
+                new ColumnVisibility(key.getColumnVisibilityData().toArray()),
+                key.getTimestamp(), value);
+          } else {
+            mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
+                new ColumnVisibility(key.getColumnVisibilityData().toArray()),
+                key.getTimestamp());
+          }
+          table.addMutation(mutation);
+          importIterator.next();
+        }
+      } catch (Exception e) {
+        FSDataOutputStream failureWriter = null;
+        DataInputStream failureReader = null;
+        try {
+          failureWriter = failureFs.create(failurePath.suffix("/"
+              + importStatus.getPath().getName()));
+          failureReader = importFs.open(importStatus.getPath());
+          int read = 0;
+          byte[] buffer = new byte[1024];
+          while (-1 != (read = failureReader.read(buffer))) {
+            failureWriter.write(buffer, 0, read);
+          }
+        } finally {
+          if (failureReader != null)
+            failureReader.close();
+          if (failureWriter != null)
+            failureWriter.close();
+        }
+      }
+      importFs.delete(importStatus.getPath(), true);
+    }
   }
   
   @Override

Modified: 
accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java?rev=1396772&r1=1396771&r2=1396772&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
 (original)
+++ 
accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
 Wed Oct 10 20:15:56 2012
@@ -16,15 +16,36 @@
  */
 package org.apache.accumulo.core.client.mock;
 
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -85,4 +106,90 @@ public class MockTableOperationsTest {
       Assert.fail();
     } catch (TableExistsException e) {}
   }
+  
+  private static class ImportTestFilesAndData {
+    Path importPath;
+    Path failurePath;
+    List<Pair<Key, Value>> keyVals;
+  }
+
+  @Test
+  public void testImport() throws Throwable {
+    ImportTestFilesAndData dataAndFiles = prepareTestFiles();
+    Instance instance = new MockInstance("foo");
+    Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer
+        .wrap(new byte[0]), "foo"));
+    TableOperations tableOperations = connector.tableOperations();
+    tableOperations.create("a_table");
+    tableOperations.importDirectory("a_table",
+        dataAndFiles.importPath.toString(),
+        dataAndFiles.failurePath.toString(), false);
+    Scanner scanner = connector.createScanner("a_table", new Authorizations());
+    Iterator<Entry<Key, Value>> iterator = scanner.iterator();
+    for (int i = 0; i < 5; i++) {
+      Assert.assertTrue(iterator.hasNext());
+      Entry<Key, Value> kv = iterator.next();
+      Pair<Key, Value> expected = dataAndFiles.keyVals.get(i);
+      Assert.assertEquals(expected.getFirst(), kv.getKey());
+      Assert.assertEquals(expected.getSecond(), kv.getValue());
+    }
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  private ImportTestFilesAndData prepareTestFiles() throws Throwable {
+    Configuration defaultConf = new Configuration();
+    Path tempFile = new Path("target/accumulo-test/import/sample.map");
+    Path failures = new Path("target/accumulo-test/failures/");
+    FileSystem fs = FileSystem.get(new URI("file:///"), defaultConf);
+    fs.deleteOnExit(tempFile);
+    fs.deleteOnExit(failures);
+    fs.delete(failures, true);
+    fs.mkdirs(failures);
+    fs.mkdirs(tempFile.getParent());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(
+        tempFile.toString(), fs, defaultConf,
+        AccumuloConfiguration.getDefaultConfiguration());
+    List<Pair<Key, Value>> keyVals = new ArrayList<Pair<Key, Value>>();
+    for (int i = 0; i < 5; i++) {
+      keyVals.add(new Pair<Key, Value>(new Key("a" + i, "b" + i, "c" + i,
+          new ColumnVisibility(""), 1000l + i), new Value(Integer.toString(i)
+          .getBytes())));
+    }
+    for (Pair<Key, Value> keyVal : keyVals) {
+      writer.append(keyVal.getFirst(), keyVal.getSecond());
+    }
+    writer.close();
+    ImportTestFilesAndData files = new ImportTestFilesAndData();
+    files.failurePath = failures;
+    files.importPath = tempFile.getParent();
+    files.keyVals = keyVals;
+    return files;
+  }
+
+  @Test(expected = TableNotFoundException.class)
+  public void testFailsWithNoTable() throws Throwable {
+    Instance instance = new MockInstance("foo");
+    Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer
+        .wrap(new byte[0]), "foo"));
+    TableOperations tableOperations = connector.tableOperations();
+    ImportTestFilesAndData testFiles = prepareTestFiles();
+    tableOperations.importDirectory("doesnt_exist_table",
+        testFiles.importPath.toString(), testFiles.failurePath.toString(),
+        false);
+  }
+
+  @Test(expected = IOException.class)
+  public void testFailsWithNonEmptyFailureDirectory() throws Throwable {
+    Instance instance = new MockInstance("foo");
+    Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer
+        .wrap(new byte[0]), "foo"));
+    TableOperations tableOperations = connector.tableOperations();
+    ImportTestFilesAndData testFiles = prepareTestFiles();
+    FileSystem fs = testFiles.failurePath.getFileSystem(new Configuration());
+    fs.open(testFiles.failurePath.suffix("/something")).close();
+    tableOperations.importDirectory("doesnt_exist_table",
+        testFiles.importPath.toString(), testFiles.failurePath.toString(),
+        false);
+  }
+  
 }


Reply via email to