Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e8f98e74 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e8f98e74 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e8f98e74 Branch: refs/heads/master Commit: e8f98e74a57735be4b6eeb488ac19aaca6bc6fb6 Parents: f5bcee7 ff8c238 Author: Josh Elser <els...@apache.org> Authored: Tue Apr 29 11:47:41 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Tue Apr 29 12:08:43 2014 -0400 ---------------------------------------------------------------------- .../client/mock/MockTableOperationsImpl.java | 4 +- .../client/mock/MockTableOperationsTest.java | 57 ++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e8f98e74/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java index fea9568,0000000..8a8895f mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperationsImpl.java @@@ -1,447 -1,0 +1,447 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mock; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.DiskUsage; +import org.apache.accumulo.core.client.admin.FindMax; +import org.apache.accumulo.core.client.impl.TableOperationsHelper; +import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +class MockTableOperationsImpl extends TableOperationsHelper { + private static final byte[] ZERO = {0}; + private final MockAccumulo acu; + private final String username; + + MockTableOperationsImpl(MockAccumulo acu, String username) { + this.acu = acu; + this.username = username; + } + + @Override + public SortedSet<String> list() { + return new TreeSet<String>(acu.tables.keySet()); + } + + @Override + public boolean exists(String tableName) { + return acu.tables.containsKey(tableName); + } + + private boolean namespaceExists(String namespace) { + return acu.namespaces.containsKey(namespace); + } + + @Override + public void create(String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException { + create(tableName, true, TimeType.MILLIS); + } + + @Override + public void create(String tableName, boolean versioningIter) throws AccumuloException, AccumuloSecurityException, TableExistsException { + create(tableName, versioningIter, TimeType.MILLIS); + } + + @Override + public void create(String tableName, boolean versioningIter, TimeType timeType) throws AccumuloException, AccumuloSecurityException, TableExistsException { + String namespace = Tables.qualify(tableName).getFirst(); + if (!tableName.matches(Tables.VALID_NAME_REGEX)) { + throw new IllegalArgumentException(); + } + if (exists(tableName)) + throw new TableExistsException(tableName, tableName, ""); + + if (!namespaceExists(namespace)) { + throw new IllegalArgumentException("Namespace (" + namespace + ") does not exist, create it first"); + } + acu.createTable(username, tableName, versioningIter, timeType); + } + + @Override + public void addSplits(String tableName, SortedSet<Text> partitionKeys) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + acu.addSplits(tableName, partitionKeys); + } + + @Deprecated + @Override + public Collection<Text> getSplits(String tableName) throws TableNotFoundException { + return listSplits(tableName); + } + + @Deprecated + @Override + public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException { + return listSplits(tableName); + } + + @Override + public Collection<Text> listSplits(String tableName) throws TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + return acu.getSplits(tableName); + } + + @Override + public Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException { + return listSplits(tableName); + } + + @Override + public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + acu.tables.remove(tableName); + } + + @Override + public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException, + TableExistsException { + if (!exists(oldTableName)) + throw new TableNotFoundException(oldTableName, oldTableName, ""); + if (exists(newTableName)) + throw new TableExistsException(newTableName, newTableName, ""); + MockTable t = acu.tables.remove(oldTableName); + String namespace = Tables.qualify(newTableName).getFirst(); + MockNamespace n = acu.namespaces.get(namespace); + if (n == null) { + n = new MockNamespace(); + } + t.setNamespaceName(namespace); + t.setNamespace(n); + acu.namespaces.put(namespace, n); + acu.tables.put(newTableName, t); + } + + @Deprecated + @Override + public void flush(String tableName) throws AccumuloException, AccumuloSecurityException {} + + @Override + public void setProperty(String tableName, String property, String value) throws AccumuloException, AccumuloSecurityException { + acu.tables.get(tableName).settings.put(property, value); + } + + @Override + public void removeProperty(String tableName, String property) throws AccumuloException, AccumuloSecurityException { + acu.tables.get(tableName).settings.remove(property); + } + + @Override + public Iterable<Entry<String,String>> getProperties(String tableName) throws TableNotFoundException { + String namespace = Tables.qualify(tableName).getFirst(); + if (!exists(tableName)) { + if (!namespaceExists(namespace)) + throw new TableNotFoundException(tableName, new NamespaceNotFoundException(null, namespace, null)); + throw new TableNotFoundException(null, tableName, null); + } + + Set<Entry<String,String>> props = new HashSet<Entry<String,String>>(acu.namespaces.get(namespace).settings.entrySet()); + + Set<Entry<String,String>> tableProps = acu.tables.get(tableName).settings.entrySet(); + for (Entry<String,String> e : tableProps) { + if (props.contains(e)) { + props.remove(e); + } + props.add(e); + } + return props; + } + + @Override + public void setLocalityGroups(String tableName, Map<String,Set<Text>> groups) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + acu.tables.get(tableName).setLocalityGroups(groups); + } + + @Override + public Map<String,Set<Text>> getLocalityGroups(String tableName) throws AccumuloException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + return acu.tables.get(tableName).getLocalityGroups(); + } + + @Override + public Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + return Collections.singleton(range); + } + + @Override + public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) throws IOException, AccumuloException, + AccumuloSecurityException, TableNotFoundException { + long time = System.currentTimeMillis(); + MockTable table = acu.tables.get(tableName); + if (table == null) { + throw new TableNotFoundException(null, tableName, "The table was not found"); + } + Path importPath = new Path(dir); + Path failurePath = new Path(failureDir); + + FileSystem fs = acu.getFileSystem(); + /* + * check preconditions + */ + // directories are directories + if (fs.isFile(importPath)) { + throw new IOException("Import path must be a directory."); + } + if (fs.isFile(failurePath)) { + throw new IOException("Failure path must be a directory."); + } + // failures are writable + Path createPath = failurePath.suffix("/.createFile"); + FSDataOutputStream createStream = null; + try { + createStream = fs.create(createPath); + } catch (IOException e) { + throw new IOException("Error path is not writable."); + } finally { + if (createStream != null) { + createStream.close(); + } + } + fs.delete(createPath, false); + // failures are empty + FileStatus[] failureChildStats = fs.listStatus(failurePath); + if (failureChildStats.length > 0) { + throw new IOException("Error path must be empty."); + } + /* + * Begin the import - iterate the files in the path + */ + for (FileStatus importStatus : fs.listStatus(importPath)) { + try { + FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(), + AccumuloConfiguration.getDefaultConfiguration()); + while (importIterator.hasTop()) { + Key key = importIterator.getTopKey(); + Value value = importIterator.getTopValue(); + if (setTime) { + key.setTimestamp(time); + } + Mutation mutation = new Mutation(key.getRow()); + if (!key.isDeleted()) { + mutation.put(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibilityData().toArray()), key.getTimestamp(), + value); + } else { + mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibilityData().toArray()), + key.getTimestamp()); + } + table.addMutation(mutation); + importIterator.next(); + } + } catch (Exception e) { + FSDataOutputStream failureWriter = null; + DataInputStream failureReader = null; + try { + failureWriter = fs.create(failurePath.suffix("/" + importStatus.getPath().getName())); + failureReader = fs.open(importStatus.getPath()); + int read = 0; + byte[] buffer = new byte[1024]; + while (-1 != (read = failureReader.read(buffer))) { + failureWriter.write(buffer, 0, read); + } + } finally { + if (failureReader != null) + failureReader.close(); + if (failureWriter != null) + failureWriter.close(); + } + } + fs.delete(importStatus.getPath(), true); + } + } + + @Override + public void offline(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + offline(tableName, false); + } + + @Override + public void offline(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public void online(String tableName) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + online(tableName, false); + } + + @Override + public void online(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public void clearLocatorCache(String tableName) throws TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public Map<String,String> tableIdMap() { + Map<String,String> result = new HashMap<String,String>(); + for (String table : acu.tables.keySet()) { + if (RootTable.NAME.equals(table)) + result.put(table, RootTable.ID); + else if (MetadataTable.NAME.equals(table)) + result.put(table, MetadataTable.ID); + else + result.put(table, table); + } + return result; + } + + @Override + public List<DiskUsage> getDiskUsage(Set<String> tables) throws AccumuloException, AccumuloSecurityException { + + List<DiskUsage> diskUsages = new ArrayList<DiskUsage>(); + diskUsages.add(new DiskUsage(new TreeSet<String>(tables), 0l)); + + return diskUsages; + } + + @Override + public void merge(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + acu.merge(tableName, start, end); + } + + @Override + public void deleteRows(String tableName, Text start, Text end) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + MockTable t = acu.tables.get(tableName); - Text startText = new Text(start); - Text endText = new Text(end); ++ Text startText = start != null ? new Text(start) : new Text(); ++ Text endText = end != null ? new Text(end) : new Text(t.table.lastKey().getRow().getBytes()); + startText.append(ZERO, 0, 1); + endText.append(ZERO, 0, 1); + Set<Key> keep = new TreeSet<Key>(t.table.subMap(new Key(startText), new Key(endText)).keySet()); + t.table.keySet().removeAll(keep); + } + + @Override + public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) throws AccumuloSecurityException, TableNotFoundException, + AccumuloException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, boolean flush, boolean wait) throws AccumuloSecurityException, + TableNotFoundException, AccumuloException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public void cancelCompaction(String tableName) throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public void clone(String srcTableName, String newTableName, boolean flush, Map<String,String> propertiesToSet, Set<String> propertiesToExclude) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { + throw new NotImplementedException(); + } + + @Override + public void flush(String tableName, Text start, Text end, boolean wait) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + if (!exists(tableName)) + throw new TableNotFoundException(tableName, tableName, ""); + } + + @Override + public Text getMaxRow(String tableName, Authorizations auths, Text startRow, boolean startInclusive, Text endRow, boolean endInclusive) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + MockTable table = acu.tables.get(tableName); + if (table == null) + throw new TableNotFoundException(tableName, tableName, "no such table"); + + return FindMax.findMax(new MockScanner(table, auths), startRow, startInclusive, endRow, endInclusive); + } + + @Override + public void importTable(String tableName, String exportDir) throws TableExistsException, AccumuloException, AccumuloSecurityException { + throw new NotImplementedException(); + } + + @Override + public void exportTable(String tableName, String exportDir) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + throw new NotImplementedException(); + } + + @Override + public boolean testClassLoad(String tableName, String className, String asTypeName) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + + try { + AccumuloVFSClassLoader.loadClass(className, Class.forName(asTypeName)); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e8f98e74/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java ---------------------------------------------------------------------- diff --cc core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java index 94dbed1,ea916e7..138ab93 --- a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java @@@ -276,4 -276,61 +276,61 @@@ public class MockTableOperationsTest Assert.assertEquals(5, oneCnt); } + @Test + public void testDeleteRowsWithNullKeys() throws Exception { + Instance instance = new MockInstance("rows"); + Connector connector = instance.getConnector("user", new PasswordToken("foo")); + TableOperations to = connector.tableOperations(); + to.create("test2"); + BatchWriter bw = connector.createBatchWriter("test2", new BatchWriterConfig()); + for (int r = 0; r < 30; r++) { + Mutation m = new Mutation(Integer.toString(r)); + for (int c = 0; c < 5; c++) { + m.put(new Text("cf"), new Text(Integer.toString(c)), new Value(Integer.toString(c).getBytes())); + } + bw.addMutation(m); + } + bw.flush(); + + // test null end + // will remove rows 4 through 9 (6 * 5 = 30 entries) + to.deleteRows("test2", new Text("30"), null); - Scanner s = connector.createScanner("test2", Constants.NO_AUTHS); ++ Scanner s = connector.createScanner("test2", Authorizations.EMPTY); + int rowCnt = 0; + for (Entry<Key,Value> entry : s) { + String rowId = entry.getKey().getRow().toString(); + Assert.assertFalse(rowId.startsWith("30")); + rowCnt++; + } + s.close(); + Assert.assertEquals(120, rowCnt); + + // test null start + // will remove 0-1, 10-19, 2 + to.deleteRows("test2", null, new Text("2")); - s = connector.createScanner("test2", Constants.NO_AUTHS); ++ s = connector.createScanner("test2", Authorizations.EMPTY); + rowCnt = 0; + for (Entry<Key,Value> entry : s) { + char rowStart = entry.getKey().getRow().toString().charAt(0); + Assert.assertTrue(rowStart >= '2'); + rowCnt++; + } + s.close(); + Assert.assertEquals(55, rowCnt); + + // test null start and end + // deletes everything still left + to.deleteRows("test2", null, null); - s = connector.createScanner("test2", Constants.NO_AUTHS); ++ s = connector.createScanner("test2", Authorizations.EMPTY); + rowCnt = 0; + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : s) { + rowCnt++; + } + s.close(); + to.delete("test2"); + Assert.assertEquals(0, rowCnt); + + } + }