http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUp.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUp.java new file mode 100644 index 0000000..f696198 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUp.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.iterators.user.GrepIterator; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletState; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CleanUp extends MasterRepo { + + final private static Logger log = LoggerFactory.getLogger(CleanUp.class); + + private static final long serialVersionUID = 1L; + + private String tableId, namespaceId; + + private long creationTime; + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + /* + * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine + * + * if the new machine has time in the future, that will work ok w/ hasCycled + */ + if (System.currentTimeMillis() < creationTime) { + creationTime = System.currentTimeMillis(); + } + + } + + public CleanUp(String tableId, String namespaceId) { + this.tableId = tableId; + this.namespaceId = namespaceId; + creationTime = System.currentTimeMillis(); + } + + @Override + public long isReady(long tid, Master master) throws Exception { + if (!master.hasCycled(creationTime)) { + return 50; + } + + boolean done = true; + Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange(); + Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); + MetaDataTableScanner.configureScanner(scanner, master); + scanner.setRange(tableRange); + + for (Entry<Key,Value> entry : scanner) { + TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue()); + TabletState state = locationState.getState(master.onlineTabletServers()); + if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) { + log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState); + done = false; + break; + } + } + + if (!done) + return 50; + + return 0; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + + master.clearMigrations(tableId); + + int refCount = 0; + + try { + // look for other tables that references this table's files + Connector conn = master.getConnector(); + BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8); + try { + Range allTables = MetadataSchema.TabletsSection.getRange(); + Range tableRange = MetadataSchema.TabletsSection.getRange(tableId); + Range beforeTable = new Range(allTables.getStartKey(), true, tableRange.getStartKey(), false); + Range afterTable = new Range(tableRange.getEndKey(), false, allTables.getEndKey(), true); + bs.setRanges(Arrays.asList(beforeTable, afterTable)); + bs.fetchColumnFamily(DataFileColumnFamily.NAME); + IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class); + GrepIterator.setTerm(cfg, "/" + tableId + "/"); + bs.addScanIterator(cfg); + + for (Entry<Key,Value> entry : bs) { + if (entry.getKey().getColumnQualifier().toString().contains("/" + tableId + "/")) { + refCount++; + } + } + } finally { + bs.close(); + } + + } catch (Exception e) { + refCount = -1; + log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e); + } + + // remove metadata table entries + try { + // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself. + // If the master lock passed to deleteTable, it is possible that the delete mutations will be dropped. If the delete operations + // are dropped and the operation completes, then the deletes will not be repeated. + MetadataTableUtil.deleteTable(tableId, refCount != 0, master, null); + } catch (Exception e) { + log.error("error deleting " + tableId + " from metadata table", e); + } + + // remove any problem reports the table may have + try { + ProblemReports.getInstance(master).deleteProblemReports(tableId); + } catch (Exception e) { + log.error("Failed to delete problem reports for table " + tableId, e); + } + + if (refCount == 0) { + final AccumuloConfiguration conf = master.getConfiguration(); + boolean archiveFiles = conf.getBoolean(Property.GC_FILE_ARCHIVE); + + // delete the map files + try { + VolumeManager fs = master.getFileSystem(); + for (String dir : ServerConstants.getTablesDirs()) { + if (archiveFiles) { + archiveFile(fs, dir, tableId); + } else { + fs.deleteRecursively(new Path(dir, tableId)); + } + } + } catch (IOException e) { + log.error("Unable to remove deleted table directory", e); + } catch (IllegalArgumentException exception) { + if (exception.getCause() instanceof UnknownHostException) { + /* Thrown if HDFS encounters a DNS problem in some edge cases */ + log.error("Unable to remove deleted table directory", exception); + } else { + throw exception; + } + } + } + + // remove table from zookeeper + try { + TableManager.getInstance().removeTable(tableId); + Tables.clearCache(master.getInstance()); + } catch (Exception e) { + log.error("Failed to find table id in zookeeper", e); + } + + // remove any permissions associated with this table + try { + AuditedSecurityOperation.getInstance(master).deleteTable(master.rpcCreds(), tableId, namespaceId); + } catch (ThriftSecurityException e) { + log.error("{}", e.getMessage(), e); + } + + Utils.unreserveTable(tableId, tid, true); + Utils.unreserveNamespace(namespaceId, tid, false); + + LoggerFactory.getLogger(CleanUp.class).debug("Deleted table " + tableId); + + return null; + } + + protected void archiveFile(VolumeManager fs, String dir, String tableId) throws IOException { + Path tableDirectory = new Path(dir, tableId); + Volume v = fs.getVolumeByPath(tableDirectory); + String basePath = v.getBasePath(); + + // Path component of URI + String tableDirPath = tableDirectory.toUri().getPath(); + + // Just the suffix of the path (after the Volume's base path) + String tableDirSuffix = tableDirPath.substring(basePath.length()); + + // Remove a leading path separator char because Path will treat the "child" as an absolute path with it + if (Path.SEPARATOR_CHAR == tableDirSuffix.charAt(0)) { + if (tableDirSuffix.length() > 1) { + tableDirSuffix = tableDirSuffix.substring(1); + } else { + tableDirSuffix = ""; + } + } + + // Get the file archive directory on this volume + final Path fileArchiveDir = new Path(basePath, ServerConstants.FILE_ARCHIVE_DIR); + + // Make sure it exists just to be safe + fs.mkdirs(fileArchiveDir); + + // The destination to archive this table to + final Path destTableDir = new Path(fileArchiveDir, tableDirSuffix); + + log.debug("Archiving " + tableDirectory + " to " + tableDirectory); + + if (fs.exists(destTableDir)) { + merge(fs, tableDirectory, destTableDir); + } else { + fs.rename(tableDirectory, destTableDir); + } + } + + protected void merge(VolumeManager fs, Path src, Path dest) throws IOException { + for (FileStatus child : fs.listStatus(src)) { + final String childName = child.getPath().getName(); + final Path childInSrc = new Path(src, childName), childInDest = new Path(dest, childName); + + if (child.isFile()) { + if (fs.exists(childInDest)) { + log.warn("File already exists in archive, ignoring. " + childInDest); + } else { + fs.rename(childInSrc, childInDest); + } + } else if (child.isDirectory()) { + if (fs.exists(childInDest)) { + // Recurse + merge(fs, childInSrc, childInDest); + } else { + fs.rename(childInSrc, childInDest); + } + } else { + // Symlinks shouldn't exist in table directories.. + log.warn("Ignoring archiving of non file/directory: " + child); + } + } + } + + @Override + public void undo(long tid, Master environment) throws Exception { + // nothing to do + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java new file mode 100644 index 0000000..85f9a8c --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CleanUpBulkImport.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CleanUpBulkImport extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private static final Logger log = LoggerFactory.getLogger(CleanUpBulkImport.class); + + private String tableId; + private String source; + private String bulk; + private String error; + + public CleanUpBulkImport(String tableId, String source, String bulk, String error) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.error = error; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + log.debug("removing the bulk processing flag file in " + bulk); + Path bulkDir = new Path(bulk); + MetadataTableUtil.removeBulkLoadInProgressFlag(master, "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); + MetadataTableUtil.addDeleteEntry(master, tableId, bulkDir.toString()); + log.debug("removing the metadata table markers for loaded files"); + Connector conn = master.getConnector(); + MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid); + log.debug("releasing HDFS reservations for " + source + " and " + error); + Utils.unreserveHdfsDirectory(source, tid); + Utils.unreserveHdfsDirectory(error, tid); + Utils.getReadLock(tableId, tid).unlock(); + log.debug("completing bulk import transaction " + tid); + ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneInfo.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneInfo.java new file mode 100644 index 0000000..335d65d --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.Serializable; +import java.util.Map; +import java.util.Set; + +class CloneInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + String srcTableId; + String tableName; + String tableId; + String namespaceId; + String srcNamespaceId; + Map<String,String> propertiesToSet; + Set<String> propertiesToExclude; + + public String user; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneMetadata.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneMetadata.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneMetadata.java new file mode 100644 index 0000000..045f6b1 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneMetadata.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.slf4j.LoggerFactory; + +class CloneMetadata extends MasterRepo { + + private static final long serialVersionUID = 1L; + private CloneInfo cloneInfo; + + public CloneMetadata(CloneInfo cloneInfo) { + this.cloneInfo = cloneInfo; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + LoggerFactory.getLogger(CloneMetadata.class).info( + String.format("Cloning %s with tableId %s from srcTableId %s", cloneInfo.tableName, cloneInfo.tableId, cloneInfo.srcTableId)); + // need to clear out any metadata entries for tableId just in case this + // died before and is executing again + MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment, environment.getMasterLock()); + MetadataTableUtil.cloneTable(environment, cloneInfo.srcTableId, cloneInfo.tableId, environment.getFileSystem()); + return new FinishCloneTable(cloneInfo); + } + + @Override + public void undo(long tid, Master environment) throws Exception { + MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment, environment.getMasterLock()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ClonePermissions.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ClonePermissions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ClonePermissions.java new file mode 100644 index 0000000..3572c31 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ClonePermissions.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.slf4j.LoggerFactory; + +class ClonePermissions extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private CloneInfo cloneInfo; + + public ClonePermissions(CloneInfo cloneInfo) { + this.cloneInfo = cloneInfo; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + // give all table permissions to the creator + for (TablePermission permission : TablePermission.values()) { + try { + AuditedSecurityOperation.getInstance(environment).grantTablePermission(environment.rpcCreds(), cloneInfo.user, cloneInfo.tableId, permission, + cloneInfo.namespaceId); + } catch (ThriftSecurityException e) { + LoggerFactory.getLogger(FinishCloneTable.class).error("{}", e.getMessage(), e); + throw e; + } + } + + // setup permissions in zookeeper before table info in zookeeper + // this way concurrent users will not get a spurious pemission denied + // error + try { + return new CloneZookeeper(cloneInfo); + } catch (NamespaceNotFoundException e) { + throw new ThriftTableOperationException(null, cloneInfo.tableName, TableOperation.CLONE, TableOperationExceptionType.NAMESPACE_NOTFOUND, + "Namespace for target table not found"); + } + } + + @Override + public void undo(long tid, Master environment) throws Exception { + AuditedSecurityOperation.getInstance(environment).deleteTable(environment.rpcCreds(), cloneInfo.tableId, cloneInfo.namespaceId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java index 192d182..eb2370e 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneTable.java @@ -16,209 +16,14 @@ */ package org.apache.accumulo.master.tableOps; -import java.io.Serializable; import java.util.Map; import java.util.Set; -import org.apache.accumulo.core.client.NamespaceNotFoundException; -import org.apache.accumulo.core.client.impl.Namespaces; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; -import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.fate.Repo; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.client.HdfsZooInstance; -import org.apache.accumulo.server.security.AuditedSecurityOperation; -import org.apache.accumulo.server.tables.TableManager; -import org.apache.accumulo.server.util.MetadataTableUtil; -import org.slf4j.LoggerFactory; - -class CloneInfo implements Serializable { - - private static final long serialVersionUID = 1L; - - String srcTableId; - String tableName; - String tableId; - String namespaceId; - String srcNamespaceId; - Map<String,String> propertiesToSet; - Set<String> propertiesToExclude; - - public String user; -} - -class FinishCloneTable extends MasterRepo { - - private static final long serialVersionUID = 1L; - private CloneInfo cloneInfo; - - public FinishCloneTable(CloneInfo cloneInfo) { - this.cloneInfo = cloneInfo; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - // directories are intentionally not created.... this is done because directories should be unique - // because they occupy a different namespace than normal tablet directories... also some clones - // may never create files.. therefore there is no need to consume namenode space w/ directories - // that are not used... tablet will create directories as needed - - TableManager.getInstance().transitionTableState(cloneInfo.tableId, TableState.ONLINE); - - Utils.unreserveNamespace(cloneInfo.srcNamespaceId, tid, false); - if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) - Utils.unreserveNamespace(cloneInfo.namespaceId, tid, false); - Utils.unreserveTable(cloneInfo.srcTableId, tid, false); - Utils.unreserveTable(cloneInfo.tableId, tid, true); - - environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName, cloneInfo.srcTableId); - - LoggerFactory.getLogger(FinishCloneTable.class).debug("Cloned table " + cloneInfo.srcTableId + " " + cloneInfo.tableId + " " + cloneInfo.tableName); - - return null; - } - - @Override - public void undo(long tid, Master environment) throws Exception {} - -} - -class CloneMetadata extends MasterRepo { - - private static final long serialVersionUID = 1L; - private CloneInfo cloneInfo; - - public CloneMetadata(CloneInfo cloneInfo) { - this.cloneInfo = cloneInfo; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - LoggerFactory.getLogger(CloneMetadata.class).info( - String.format("Cloning %s with tableId %s from srcTableId %s", cloneInfo.tableName, cloneInfo.tableId, cloneInfo.srcTableId)); - // need to clear out any metadata entries for tableId just in case this - // died before and is executing again - MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment, environment.getMasterLock()); - MetadataTableUtil.cloneTable(environment, cloneInfo.srcTableId, cloneInfo.tableId, environment.getFileSystem()); - return new FinishCloneTable(cloneInfo); - } - - @Override - public void undo(long tid, Master environment) throws Exception { - MetadataTableUtil.deleteTable(cloneInfo.tableId, false, environment, environment.getMasterLock()); - } - -} - -class CloneZookeeper extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private CloneInfo cloneInfo; - - public CloneZookeeper(CloneInfo cloneInfo) throws NamespaceNotFoundException { - this.cloneInfo = cloneInfo; - this.cloneInfo.namespaceId = Namespaces.getNamespaceId(HdfsZooInstance.getInstance(), Tables.qualify(this.cloneInfo.tableName).getFirst()); - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - long val = 0; - if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) - val += Utils.reserveNamespace(cloneInfo.namespaceId, tid, false, true, TableOperation.CLONE); - val += Utils.reserveTable(cloneInfo.tableId, tid, true, false, TableOperation.CLONE); - return val; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - Utils.tableNameLock.lock(); - try { - // write tableName & tableId to zookeeper - - Utils.checkTableDoesNotExist(environment.getInstance(), cloneInfo.tableName, cloneInfo.tableId, TableOperation.CLONE); - - TableManager.getInstance().cloneTable(cloneInfo.srcTableId, cloneInfo.tableId, cloneInfo.tableName, cloneInfo.namespaceId, cloneInfo.propertiesToSet, - cloneInfo.propertiesToExclude, NodeExistsPolicy.OVERWRITE); - Tables.clearCache(environment.getInstance()); - - return new CloneMetadata(cloneInfo); - } finally { - Utils.tableNameLock.unlock(); - } - } - - @Override - public void undo(long tid, Master environment) throws Exception { - TableManager.getInstance().removeTable(cloneInfo.tableId); - if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) - Utils.unreserveNamespace(cloneInfo.namespaceId, tid, false); - Utils.unreserveTable(cloneInfo.tableId, tid, true); - Tables.clearCache(environment.getInstance()); - } - -} - -class ClonePermissions extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private CloneInfo cloneInfo; - - public ClonePermissions(CloneInfo cloneInfo) { - this.cloneInfo = cloneInfo; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - // give all table permissions to the creator - for (TablePermission permission : TablePermission.values()) { - try { - AuditedSecurityOperation.getInstance(environment).grantTablePermission(environment.rpcCreds(), cloneInfo.user, cloneInfo.tableId, permission, - cloneInfo.namespaceId); - } catch (ThriftSecurityException e) { - LoggerFactory.getLogger(FinishCloneTable.class).error("{}", e.getMessage(), e); - throw e; - } - } - - // setup permissions in zookeeper before table info in zookeeper - // this way concurrent users will not get a spurious pemission denied - // error - try { - return new CloneZookeeper(cloneInfo); - } catch (NamespaceNotFoundException e) { - throw new ThriftTableOperationException(null, cloneInfo.tableName, TableOperation.CLONE, TableOperationExceptionType.NAMESPACE_NOTFOUND, - "Namespace for target table not found"); - } - } - - @Override - public void undo(long tid, Master environment) throws Exception { - AuditedSecurityOperation.getInstance(environment).deleteTable(environment.rpcCreds(), cloneInfo.tableId, cloneInfo.namespaceId); - } -} public class CloneTable extends MasterRepo { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneZookeeper.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneZookeeper.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneZookeeper.java new file mode 100644 index 0000000..072f5de --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CloneZookeeper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.tables.TableManager; + +class CloneZookeeper extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private CloneInfo cloneInfo; + + public CloneZookeeper(CloneInfo cloneInfo) throws NamespaceNotFoundException { + this.cloneInfo = cloneInfo; + this.cloneInfo.namespaceId = Namespaces.getNamespaceId(HdfsZooInstance.getInstance(), Tables.qualify(this.cloneInfo.tableName).getFirst()); + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + long val = 0; + if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) + val += Utils.reserveNamespace(cloneInfo.namespaceId, tid, false, true, TableOperation.CLONE); + val += Utils.reserveTable(cloneInfo.tableId, tid, true, false, TableOperation.CLONE); + return val; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + Utils.tableNameLock.lock(); + try { + // write tableName & tableId to zookeeper + + Utils.checkTableDoesNotExist(environment.getInstance(), cloneInfo.tableName, cloneInfo.tableId, TableOperation.CLONE); + + TableManager.getInstance().cloneTable(cloneInfo.srcTableId, cloneInfo.tableId, cloneInfo.tableName, cloneInfo.namespaceId, cloneInfo.propertiesToSet, + cloneInfo.propertiesToExclude, NodeExistsPolicy.OVERWRITE); + Tables.clearCache(environment.getInstance()); + + return new CloneMetadata(cloneInfo); + } finally { + Utils.tableNameLock.unlock(); + } + } + + @Override + public void undo(long tid, Master environment) throws Exception { + TableManager.getInstance().removeTable(cloneInfo.tableId); + if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) + Utils.unreserveNamespace(cloneInfo.namespaceId, tid, false); + Utils.unreserveTable(cloneInfo.tableId, tid, true); + Tables.clearCache(environment.getInstance()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java index 133663d..befaea3 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java @@ -18,188 +18,29 @@ package org.apache.accumulo.master.tableOps; import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; -import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.tableOps.UserCompactionConfig; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; -import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException.NoNodeException; -import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -class CompactionDriver extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private long compactId; - private final String tableId; - private byte[] startRow; - private byte[] endRow; - - public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) { - - this.compactId = compactId; - this.tableId = tableId; - this.startRow = startRow; - this.endRow = endRow; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - - String zCancelID = Constants.ZROOT + "/" + master.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_CANCEL_ID; - - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - - if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) { - // compaction was canceled - throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled"); - } - - MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>(); - Connector conn = master.getConnector(); - - Scanner scanner; - - if (tableId.equals(MetadataTable.ID)) { - scanner = new IsolatedScanner(conn.createScanner(RootTable.NAME, Authorizations.EMPTY)); - scanner.setRange(MetadataSchema.TabletsSection.getRange()); - } else { - scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange(); - scanner.setRange(range); - } - - TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); - scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); - - long t1 = System.currentTimeMillis(); - RowIterator ri = new RowIterator(scanner); - - int tabletsToWaitFor = 0; - int tabletCount = 0; - - while (ri.hasNext()) { - Iterator<Entry<Key,Value>> row = ri.next(); - long tabletCompactID = -1; - - TServerInstance server = null; - - Entry<Key,Value> entry = null; - while (row.hasNext()) { - entry = row.next(); - Key key = entry.getKey(); - - if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) - tabletCompactID = Long.parseLong(entry.getValue().toString()); - - if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily())) - server = new TServerInstance(entry.getValue(), key.getColumnQualifier()); - } - - if (tabletCompactID < compactId) { - tabletsToWaitFor++; - if (server != null) - serversToFlush.increment(server, 1); - } - - tabletCount++; - - Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow(); - if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0)) - break; - } - - long scanTime = System.currentTimeMillis() - t1; - - Instance instance = master.getInstance(); - Tables.clearCache(instance); - if (tabletCount == 0 && !Tables.exists(instance, tableId)) - throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null); - - if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null); - - if (tabletsToWaitFor == 0) - return 0; - - for (TServerInstance tsi : serversToFlush.keySet()) { - try { - final TServerConnection server = master.getConnection(tsi); - if (server != null) - server.compact(master.getMasterLock(), tableId, startRow, endRow); - } catch (TException ex) { - LoggerFactory.getLogger(CompactionDriver.class).error(ex.toString()); - } - } - - long sleepTime = 500; - - if (serversToFlush.size() > 0) - sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to - // compact - - sleepTime = Math.max(2 * scanTime, sleepTime); - - sleepTime = Math.min(sleepTime, 30000); - - return sleepTime; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - CompactRange.removeIterators(environment, tid, tableId); - Utils.getReadLock(tableId, tid).unlock(); - Utils.getReadLock(namespaceId, tid).unlock(); - return null; - } - - @Override - public void undo(long tid, Master environment) throws Exception { - - } - -} - public class CompactRange extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java new file mode 100644 index 0000000..e3d0820 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; +import org.slf4j.LoggerFactory; + +class CompactionDriver extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private long compactId; + private final String tableId; + private byte[] startRow; + private byte[] endRow; + + public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) { + + this.compactId = compactId; + this.tableId = tableId; + this.startRow = startRow; + this.endRow = endRow; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + + String zCancelID = Constants.ZROOT + "/" + master.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_CANCEL_ID; + + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) { + // compaction was canceled + throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled"); + } + + MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>(); + Connector conn = master.getConnector(); + + Scanner scanner; + + if (tableId.equals(MetadataTable.ID)) { + scanner = new IsolatedScanner(conn.createScanner(RootTable.NAME, Authorizations.EMPTY)); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + } else { + scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); + Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange(); + scanner.setRange(range); + } + + TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + + long t1 = System.currentTimeMillis(); + RowIterator ri = new RowIterator(scanner); + + int tabletsToWaitFor = 0; + int tabletCount = 0; + + while (ri.hasNext()) { + Iterator<Entry<Key,Value>> row = ri.next(); + long tabletCompactID = -1; + + TServerInstance server = null; + + Entry<Key,Value> entry = null; + while (row.hasNext()) { + entry = row.next(); + Key key = entry.getKey(); + + if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) + tabletCompactID = Long.parseLong(entry.getValue().toString()); + + if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily())) + server = new TServerInstance(entry.getValue(), key.getColumnQualifier()); + } + + if (tabletCompactID < compactId) { + tabletsToWaitFor++; + if (server != null) + serversToFlush.increment(server, 1); + } + + tabletCount++; + + Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow(); + if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0)) + break; + } + + long scanTime = System.currentTimeMillis() - t1; + + Instance instance = master.getInstance(); + Tables.clearCache(instance); + if (tabletCount == 0 && !Tables.exists(instance, tableId)) + throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null); + + if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null); + + if (tabletsToWaitFor == 0) + return 0; + + for (TServerInstance tsi : serversToFlush.keySet()) { + try { + final TServerConnection server = master.getConnection(tsi); + if (server != null) + server.compact(master.getMasterLock(), tableId, startRow, endRow); + } catch (TException ex) { + LoggerFactory.getLogger(CompactionDriver.class).error(ex.toString()); + } + } + + long sleepTime = 500; + + if (serversToFlush.size() > 0) + sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to + // compact + + sleepTime = Math.max(2 * scanTime, sleepTime); + + sleepTime = Math.min(sleepTime, 30000); + + return sleepTime; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); + CompactRange.removeIterators(environment, tid, tableId); + Utils.getReadLock(tableId, tid).unlock(); + Utils.getReadLock(namespaceId, tid).unlock(); + return null; + } + + @Override + public void undo(long tid, Master environment) throws Exception { + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompleteBulkImport.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompleteBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompleteBulkImport.java new file mode 100644 index 0000000..8905c80 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompleteBulkImport.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; + +class CompleteBulkImport extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private String tableId; + private String source; + private String bulk; + private String error; + + public CompleteBulkImport(String tableId, String source, String bulk, String error) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.error = error; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); + return new CopyFailed(tableId, source, bulk, error); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java new file mode 100644 index 0000000..e0cc8ec --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; + +class CopyFailed extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private String tableId; + private String source; + private String bulk; + private String error; + + public CopyFailed(String tableId, String source, String bulk, String error) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.error = error; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + Set<TServerInstance> finished = new HashSet<TServerInstance>(); + Set<TServerInstance> running = master.onlineTabletServers(); + for (TServerInstance server : running) { + try { + TServerConnection client = master.getConnection(server); + if (client != null && !client.isActive(tid)) + finished.add(server); + } catch (TException ex) { + log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); + } + } + if (finished.containsAll(running)) + return 0; + return 500; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + // This needs to execute after the arbiter is stopped + + VolumeManager fs = master.getFileSystem(); + + if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT))) + return new CleanUpBulkImport(tableId, source, bulk, error); + + HashMap<FileRef,String> failures = new HashMap<FileRef,String>(); + HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>(); + + try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(error, BulkImport.FAILURES_TXT)), UTF_8))) { + String line = null; + while ((line = in.readLine()) != null) { + Path path = new Path(line); + if (!fs.exists(new Path(error, path.getName()))) + failures.put(new FileRef(line, path), line); + } + } + + /* + * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that + * have no loaded markers. + */ + + // determine which failed files were loaded + Connector conn = master.getConnector(); + Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); + mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); + mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + + for (Entry<Key,Value> entry : mscanner) { + if (Long.parseLong(entry.getValue().toString()) == tid) { + FileRef loadedFile = new FileRef(fs, entry.getKey()); + String absPath = failures.remove(loadedFile); + if (absPath != null) { + loadedFailures.put(loadedFile, absPath); + } + } + } + + // move failed files that were not loaded + for (String failure : failures.values()) { + Path orig = new Path(failure); + Path dest = new Path(error, orig.getName()); + fs.rename(orig, dest); + log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed"); + } + + if (loadedFailures.size() > 0) { + DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + master.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ, + master.getConfiguration()); + + HashSet<String> workIds = new HashSet<String>(); + + for (String failure : loadedFailures.values()) { + Path orig = new Path(failure); + Path dest = new Path(error, orig.getName()); + + if (fs.exists(dest)) + continue; + + bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8)); + workIds.add(orig.getName()); + log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); + } + + bifCopyQueue.waitUntilDone(workIds); + } + + fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT)); + return new CleanUpBulkImport(tableId, source, bulk, error); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateDir.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateDir.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateDir.java new file mode 100644 index 0000000..6221624 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateDir.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.Path; + +class CreateDir extends MasterRepo { + private static final long serialVersionUID = 1L; + + private TableInfo tableInfo; + + CreateDir(TableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + VolumeManager fs = master.getFileSystem(); + fs.mkdirs(new Path(tableInfo.dir)); + return new PopulateMetadata(tableInfo); + } + + @Override + public void undo(long tid, Master master) throws Exception { + VolumeManager fs = master.getFileSystem(); + fs.deleteRecursively(new Path(tableInfo.dir)); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateImportDir.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateImportDir.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateImportDir.java new file mode 100644 index 0000000..4f0e7f8 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateImportDir.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.util.Arrays; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.tablets.UniqueNameAllocator; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CreateImportDir extends MasterRepo { + private static final Logger log = LoggerFactory.getLogger(CreateImportDir.class); + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + CreateImportDir(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + + UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); + + Path exportDir = new Path(tableInfo.exportDir); + String[] tableDirs = ServerConstants.getTablesDirs(); + + log.info("Looking for matching filesystem for " + exportDir + " from options " + Arrays.toString(tableDirs)); + Path base = master.getFileSystem().matchingFileSystem(exportDir, tableDirs); + log.info("Chose base table directory of " + base); + Path directory = new Path(base, tableInfo.tableId); + + Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName()); + + tableInfo.importDir = newBulkDir.toString(); + + log.info("Using import dir: " + tableInfo.importDir); + + return new MapImportFileNames(tableInfo); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateNamespace.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateNamespace.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateNamespace.java index 9264031..b01fbcc 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateNamespace.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateNamespace.java @@ -16,147 +16,10 @@ */ package org.apache.accumulo.master.tableOps; -import java.io.Serializable; import java.util.Map; -import java.util.Map.Entry; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.client.impl.thrift.TableOperation; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.security.NamespacePermission; import org.apache.accumulo.fate.Repo; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.security.AuditedSecurityOperation; -import org.apache.accumulo.server.security.SecurityOperation; -import org.apache.accumulo.server.tables.TableManager; -import org.apache.accumulo.server.util.NamespacePropUtil; -import org.slf4j.LoggerFactory; - -class NamespaceInfo implements Serializable { - - private static final long serialVersionUID = 1L; - - String namespaceName; - String namespaceId; - String user; - - public Map<String,String> props; -} - -class FinishCreateNamespace extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private NamespaceInfo namespaceInfo; - - public FinishCreateNamespace(NamespaceInfo ti) { - this.namespaceInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long id, Master env) throws Exception { - - Utils.unreserveNamespace(namespaceInfo.namespaceId, id, true); - - env.getEventCoordinator().event("Created namespace %s ", namespaceInfo.namespaceName); - - LoggerFactory.getLogger(FinishCreateNamespace.class).debug("Created table " + namespaceInfo.namespaceId + " " + namespaceInfo.namespaceName); - - return null; - } - - @Override - public String getReturn() { - return namespaceInfo.namespaceId; - } - - @Override - public void undo(long tid, Master env) throws Exception {} - -} - -class PopulateZookeeperWithNamespace extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private NamespaceInfo namespaceInfo; - - PopulateZookeeperWithNamespace(NamespaceInfo ti) { - this.namespaceInfo = ti; - } - - @Override - public long isReady(long id, Master environment) throws Exception { - return Utils.reserveNamespace(namespaceInfo.namespaceId, id, true, false, TableOperation.CREATE); - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - - Utils.tableNameLock.lock(); - try { - Instance instance = master.getInstance(); - - Utils.checkNamespaceDoesNotExist(instance, namespaceInfo.namespaceName, namespaceInfo.namespaceId, TableOperation.CREATE); - - TableManager.prepareNewNamespaceState(instance.getInstanceID(), namespaceInfo.namespaceId, namespaceInfo.namespaceName, NodeExistsPolicy.OVERWRITE); - - for (Entry<String,String> entry : namespaceInfo.props.entrySet()) - NamespacePropUtil.setNamespaceProperty(namespaceInfo.namespaceId, entry.getKey(), entry.getValue()); - - Tables.clearCache(instance); - - return new FinishCreateNamespace(namespaceInfo); - } finally { - Utils.tableNameLock.unlock(); - } - } - - @Override - public void undo(long tid, Master master) throws Exception { - TableManager.getInstance().removeNamespace(namespaceInfo.namespaceId); - Tables.clearCache(master.getInstance()); - Utils.unreserveNamespace(namespaceInfo.namespaceId, tid, true); - } - -} - -class SetupNamespacePermissions extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private NamespaceInfo namespaceInfo; - - public SetupNamespacePermissions(NamespaceInfo ti) { - this.namespaceInfo = ti; - } - - @Override - public Repo<Master> call(long tid, Master env) throws Exception { - // give all namespace permissions to the creator - SecurityOperation security = AuditedSecurityOperation.getInstance(env); - for (NamespacePermission permission : NamespacePermission.values()) { - try { - security.grantNamespacePermission(env.rpcCreds(), namespaceInfo.user, namespaceInfo.namespaceId, permission); - } catch (ThriftSecurityException e) { - LoggerFactory.getLogger(FinishCreateNamespace.class).error("{}", e.getMessage(), e); - throw e; - } - } - - // setup permissions in zookeeper before table info in zookeeper - // this way concurrent users will not get a spurious permission denied - // error - return new PopulateZookeeperWithNamespace(namespaceInfo); - } -} public class CreateNamespace extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java index 9436704..ea2e395 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CreateTable.java @@ -16,264 +16,13 @@ */ package org.apache.accumulo.master.tableOps; -import java.io.Serializable; import java.util.Map; -import java.util.Map.Entry; -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.admin.TimeType; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.fate.Repo; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.security.AuditedSecurityOperation; -import org.apache.accumulo.server.security.SecurityOperation; -import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.tablets.TabletTime; -import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.server.util.TablePropUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; - -class TableInfo implements Serializable { - - private static final long serialVersionUID = 1L; - - String tableName; - String tableId; - String namespaceId; - char timeType; - String user; - - public Map<String,String> props; - - public String dir = null; -} - -class FinishCreateTable extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private TableInfo tableInfo; - - public FinishCreateTable(TableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master env) throws Exception { - TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE); - - Utils.unreserveNamespace(tableInfo.namespaceId, tid, false); - Utils.unreserveTable(tableInfo.tableId, tid, true); - - env.getEventCoordinator().event("Created table %s ", tableInfo.tableName); - - LoggerFactory.getLogger(FinishCreateTable.class).debug("Created table " + tableInfo.tableId + " " + tableInfo.tableName); - - return null; - } - - @Override - public String getReturn() { - return tableInfo.tableId; - } - - @Override - public void undo(long tid, Master env) throws Exception {} - -} - -class PopulateMetadata extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private TableInfo tableInfo; - - PopulateMetadata(TableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - KeyExtent extent = new KeyExtent(new Text(tableInfo.tableId), null, null); - MetadataTableUtil.addTablet(extent, tableInfo.dir, environment, tableInfo.timeType, environment.getMasterLock()); - - return new FinishCreateTable(tableInfo); - - } - - @Override - public void undo(long tid, Master environment) throws Exception { - MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment, environment.getMasterLock()); - } - -} - -class CreateDir extends MasterRepo { - private static final long serialVersionUID = 1L; - - private TableInfo tableInfo; - - CreateDir(TableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - VolumeManager fs = master.getFileSystem(); - fs.mkdirs(new Path(tableInfo.dir)); - return new PopulateMetadata(tableInfo); - } - - @Override - public void undo(long tid, Master master) throws Exception { - VolumeManager fs = master.getFileSystem(); - fs.deleteRecursively(new Path(tableInfo.dir)); - - } -} - -class ChooseDir extends MasterRepo { - private static final long serialVersionUID = 1L; - - private TableInfo tableInfo; - - ChooseDir(TableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - // Constants.DEFAULT_TABLET_LOCATION has a leading slash prepended to it so we don't need to add one here - tableInfo.dir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR - + tableInfo.tableId + Constants.DEFAULT_TABLET_LOCATION; - return new CreateDir(tableInfo); - } - - @Override - public void undo(long tid, Master master) throws Exception { - - } -} - -class PopulateZookeeper extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private TableInfo tableInfo; - - PopulateZookeeper(TableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.CREATE); - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - // reserve the table name in zookeeper or fail - - Utils.tableNameLock.lock(); - try { - // write tableName & tableId to zookeeper - Instance instance = master.getInstance(); - - Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE); - - TableManager.getInstance().addTable(tableInfo.tableId, tableInfo.namespaceId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE); - - for (Entry<String,String> entry : tableInfo.props.entrySet()) - TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue()); - - Tables.clearCache(instance); - return new ChooseDir(tableInfo); - } finally { - Utils.tableNameLock.unlock(); - } - - } - - @Override - public void undo(long tid, Master master) throws Exception { - Instance instance = master.getInstance(); - TableManager.getInstance().removeTable(tableInfo.tableId); - Utils.unreserveTable(tableInfo.tableId, tid, true); - Tables.clearCache(instance); - } - -} - -class SetupPermissions extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private TableInfo tableInfo; - - public SetupPermissions(TableInfo ti) { - this.tableInfo = ti; - } - - @Override - public Repo<Master> call(long tid, Master env) throws Exception { - // give all table permissions to the creator - SecurityOperation security = AuditedSecurityOperation.getInstance(env); - if (!tableInfo.user.equals(env.getCredentials().getPrincipal())) { - for (TablePermission permission : TablePermission.values()) { - try { - security.grantTablePermission(env.rpcCreds(), tableInfo.user, tableInfo.tableId, permission, tableInfo.namespaceId); - } catch (ThriftSecurityException e) { - LoggerFactory.getLogger(FinishCreateTable.class).error("{}", e.getMessage(), e); - throw e; - } - } - } - - // setup permissions in zookeeper before table info in zookeeper - // this way concurrent users will not get a spurious permission denied - // error - return new PopulateZookeeper(tableInfo); - } - - @Override - public void undo(long tid, Master env) throws Exception { - AuditedSecurityOperation.getInstance(env).deleteTable(env.rpcCreds(), tableInfo.tableId, tableInfo.namespaceId); - } - -} public class CreateTable extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteNamespace.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteNamespace.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteNamespace.java index 3aa3719..f84671e 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteNamespace.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteNamespace.java @@ -16,64 +16,9 @@ */ package org.apache.accumulo.master.tableOps; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.security.AuditedSecurityOperation; -import org.apache.accumulo.server.tables.TableManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class NamespaceCleanUp extends MasterRepo { - - final private static Logger log = LoggerFactory.getLogger(CleanUp.class); - - private static final long serialVersionUID = 1L; - - private String namespaceId; - - public NamespaceCleanUp(String namespaceId) { - this.namespaceId = namespaceId; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long id, Master master) throws Exception { - - // remove from zookeeper - try { - TableManager.getInstance().removeNamespace(namespaceId); - } catch (Exception e) { - log.error("Failed to find namespace in zookeeper", e); - } - Tables.clearCache(master.getInstance()); - - // remove any permissions associated with this namespace - try { - AuditedSecurityOperation.getInstance(master).deleteNamespace(master.rpcCreds(), namespaceId); - } catch (ThriftSecurityException e) { - log.error("{}", e.getMessage(), e); - } - - Utils.unreserveNamespace(namespaceId, id, true); - - LoggerFactory.getLogger(CleanUp.class).debug("Deleted namespace " + namespaceId); - - return null; - } - - @Override - public void undo(long tid, Master environment) throws Exception { - // nothing to do - } - -} public class DeleteNamespace extends MasterRepo {