Author: ecn Date: Wed Oct 10 20:15:56 2012 New Revision: 1396772 URL: http://svn.apache.org/viewvc?rev=1396772&view=rev Log: ACCUMULO-752 patched in Ed Kohlwey's implementation of importDirectory for MockAccumulo
Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java?rev=1396772&r1=1396771&r2=1396772&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java Wed Oct 10 20:15:56 2012 @@ -29,12 +29,18 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.SystemPermission; import org.apache.accumulo.core.security.TablePermission; +import org.apache.hadoop.fs.FileSystem; @SuppressWarnings("deprecation") public class MockAccumulo { final Map<String,MockTable> tables = new HashMap<String,MockTable>(); final Map<String,String> systemProperties = new HashMap<String,String>(); Map<String,MockUser> users = new HashMap<String,MockUser>(); + final FileSystem fs; + + MockAccumulo(FileSystem fs) { + this.fs = fs; + } { MockUser root = new MockUser("root", new byte[] {}, Constants.NO_AUTHS); @@ -43,6 +49,10 @@ public class MockAccumulo { createTable("root", Constants.METADATA_TABLE_NAME, true, TimeType.LOGICAL); } + public FileSystem getFileSystem() { + return fs; + } + void setProperty(String key, String value) { systemProperties.put(key, value); } Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java?rev=1396772&r1=1396771&r2=1396772&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java Wed Oct 10 20:15:56 2012 @@ -36,7 +36,7 @@ public class MockConnector extends Conne private final Instance instance; MockConnector(String username, Instance instance) { - this(username, new MockAccumulo(), instance); + this(username, new MockAccumulo(MockInstance.getDefaultFileSystem()), instance); } @SuppressWarnings("deprecation") Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java?rev=1396772&r1=1396771&r2=1396772&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java Wed Oct 10 20:15:56 2012 @@ -16,6 +16,7 @@ */ package org.apache.accumulo.core.client.mock; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -30,7 +31,9 @@ import org.apache.accumulo.core.conf.Acc import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.TextUtil; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; public class MockInstance implements Instance { @@ -41,16 +44,28 @@ public class MockInstance implements Ins String instanceName; public MockInstance() { - acu = new MockAccumulo(); + acu = new MockAccumulo(getDefaultFileSystem()); instanceName = "mock-instance"; } + static FileSystem getDefaultFileSystem() { + try { + return FileSystem.get(CachedConfiguration.getInstance()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + public MockInstance(String instanceName) { + this(instanceName, getDefaultFileSystem()); + } + + public MockInstance(String instanceName, FileSystem fs) { synchronized (instances) { if (instances.containsKey(instanceName)) acu = instances.get(instanceName); else - instances.put(instanceName, acu = new MockAccumulo()); + instances.put(instanceName, acu = new MockAccumulo(fs)); } this.instanceName = instanceName; } Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java?rev=1396772&r1=1396771&r2=1396772&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java (original) +++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java Wed Oct 10 20:15:56 2012 @@ -16,6 +16,7 @@ */ package org.apache.accumulo.core.client.mock; +import java.io.DataInputStream; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -36,11 +37,23 @@ import org.apache.accumulo.core.client.T import org.apache.accumulo.core.client.admin.FindMax; import org.apache.accumulo.core.client.admin.TableOperationsHelper; import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.iterators.conf.PerColumnIteratorConfig; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.util.BulkImportHelper.AssignmentStats; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @SuppressWarnings("deprecation") @@ -178,7 +191,94 @@ public class MockTableOperations extends @Override public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { - throw new NotImplementedException(); + long time = System.currentTimeMillis(); + MockTable table = acu.tables.get(tableName); + if (table == null) { + throw new TableNotFoundException(null, tableName, + "The table was not found"); + } + Path importPath = new Path(dir); + Path failurePath = new Path(failureDir); + + Configuration conf = new Configuration(); + FileSystem importFs = importPath.getFileSystem(conf); + FileSystem failureFs = importPath.getFileSystem(conf); + /* + * check preconditions + */ + // directories are directories + if (importFs.isFile(importPath)) { + throw new IOException("Import path must be a directory."); + } + if (failureFs.isFile(failurePath)) { + throw new IOException("Failure path must be a directory."); + } + // failures are writable + Path createPath = failurePath.suffix("/.createFile"); + FSDataOutputStream createStream = null; + try { + createStream = failureFs.create(createPath); + } catch (IOException e) { + throw new IOException("Error path is not writable."); + } finally { + if (createStream != null) { + createStream.close(); + } + } + failureFs.delete(createPath, false); + // failures are empty + FileStatus[] failureChildStats = failureFs.listStatus(failurePath); + if (failureChildStats.length > 0) { + throw new IOException("Error path must be empty."); + } + /* + * Begin the import - iterate the files in the path + */ + for (FileStatus importStatus : importFs.listStatus(importPath)) { + try { + FileSKVIterator importIterator = FileOperations.getInstance() + .openReader(importStatus.getPath().toString(), true, importFs, + conf, AccumuloConfiguration.getDefaultConfiguration()); + while (importIterator.hasTop()) { + Key key = importIterator.getTopKey(); + Value value = importIterator.getTopValue(); + if (setTime) { + key.setTimestamp(time); + } + Mutation mutation = new Mutation(key.getRow()); + if (!key.isDeleted()) { + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibilityData().toArray()), + key.getTimestamp(), value); + } else { + mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), + new ColumnVisibility(key.getColumnVisibilityData().toArray()), + key.getTimestamp()); + } + table.addMutation(mutation); + importIterator.next(); + } + } catch (Exception e) { + FSDataOutputStream failureWriter = null; + DataInputStream failureReader = null; + try { + failureWriter = failureFs.create(failurePath.suffix("/" + + importStatus.getPath().getName())); + failureReader = importFs.open(importStatus.getPath()); + int read = 0; + byte[] buffer = new byte[1024]; + while (-1 != (read = failureReader.read(buffer))) { + failureWriter.write(buffer, 0, read); + } + } finally { + if (failureReader != null) + failureReader.close(); + if (failureWriter != null) + failureWriter.close(); + } + } + importFs.delete(importStatus.getPath(), true); + } } @Override Modified: accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java?rev=1396772&r1=1396771&r2=1396772&view=diff ============================================================================== --- accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java (original) +++ accumulo/branches/1.4/src/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java Wed Oct 10 20:15:56 2012 @@ -16,15 +16,36 @@ */ package org.apache.accumulo.core.client.mock; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.security.thrift.AuthInfo; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Test; @@ -85,4 +106,90 @@ public class MockTableOperationsTest { Assert.fail(); } catch (TableExistsException e) {} } + + private static class ImportTestFilesAndData { + Path importPath; + Path failurePath; + List<Pair<Key, Value>> keyVals; + } + + @Test + public void testImport() throws Throwable { + ImportTestFilesAndData dataAndFiles = prepareTestFiles(); + Instance instance = new MockInstance("foo"); + Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer + .wrap(new byte[0]), "foo")); + TableOperations tableOperations = connector.tableOperations(); + tableOperations.create("a_table"); + tableOperations.importDirectory("a_table", + dataAndFiles.importPath.toString(), + dataAndFiles.failurePath.toString(), false); + Scanner scanner = connector.createScanner("a_table", new Authorizations()); + Iterator<Entry<Key, Value>> iterator = scanner.iterator(); + for (int i = 0; i < 5; i++) { + Assert.assertTrue(iterator.hasNext()); + Entry<Key, Value> kv = iterator.next(); + Pair<Key, Value> expected = dataAndFiles.keyVals.get(i); + Assert.assertEquals(expected.getFirst(), kv.getKey()); + Assert.assertEquals(expected.getSecond(), kv.getValue()); + } + Assert.assertFalse(iterator.hasNext()); + } + + private ImportTestFilesAndData prepareTestFiles() throws Throwable { + Configuration defaultConf = new Configuration(); + Path tempFile = new Path("target/accumulo-test/import/sample.map"); + Path failures = new Path("target/accumulo-test/failures/"); + FileSystem fs = FileSystem.get(new URI("file:///"), defaultConf); + fs.deleteOnExit(tempFile); + fs.deleteOnExit(failures); + fs.delete(failures, true); + fs.mkdirs(failures); + fs.mkdirs(tempFile.getParent()); + FileSKVWriter writer = FileOperations.getInstance().openWriter( + tempFile.toString(), fs, defaultConf, + AccumuloConfiguration.getDefaultConfiguration()); + List<Pair<Key, Value>> keyVals = new ArrayList<Pair<Key, Value>>(); + for (int i = 0; i < 5; i++) { + keyVals.add(new Pair<Key, Value>(new Key("a" + i, "b" + i, "c" + i, + new ColumnVisibility(""), 1000l + i), new Value(Integer.toString(i) + .getBytes()))); + } + for (Pair<Key, Value> keyVal : keyVals) { + writer.append(keyVal.getFirst(), keyVal.getSecond()); + } + writer.close(); + ImportTestFilesAndData files = new ImportTestFilesAndData(); + files.failurePath = failures; + files.importPath = tempFile.getParent(); + files.keyVals = keyVals; + return files; + } + + @Test(expected = TableNotFoundException.class) + public void testFailsWithNoTable() throws Throwable { + Instance instance = new MockInstance("foo"); + Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer + .wrap(new byte[0]), "foo")); + TableOperations tableOperations = connector.tableOperations(); + ImportTestFilesAndData testFiles = prepareTestFiles(); + tableOperations.importDirectory("doesnt_exist_table", + testFiles.importPath.toString(), testFiles.failurePath.toString(), + false); + } + + @Test(expected = IOException.class) + public void testFailsWithNonEmptyFailureDirectory() throws Throwable { + Instance instance = new MockInstance("foo"); + Connector connector = instance.getConnector(new AuthInfo("user", ByteBuffer + .wrap(new byte[0]), "foo")); + TableOperations tableOperations = connector.tableOperations(); + ImportTestFilesAndData testFiles = prepareTestFiles(); + FileSystem fs = testFiles.failurePath.getFileSystem(new Configuration()); + fs.open(testFiles.failurePath.suffix("/something")).close(); + tableOperations.importDirectory("doesnt_exist_table", + testFiles.importPath.toString(), testFiles.failurePath.toString(), + false); + } + }