http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java index 05676e7..a1158f4 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java @@ -16,277 +16,12 @@ */ package org.apache.accumulo.master.tableOps; -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.iterators.user.GrepIterator; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.master.state.MetaDataTableScanner; -import org.apache.accumulo.server.master.state.TabletLocationState; -import org.apache.accumulo.server.master.state.TabletState; -import org.apache.accumulo.server.problems.ProblemReports; -import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.tables.TableManager; -import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class CleanUp extends MasterRepo { - - final private static Logger log = LoggerFactory.getLogger(CleanUp.class); - - private static final long serialVersionUID = 1L; - - private String tableId, namespaceId; - - private long creationTime; - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - /* - * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine - * - * if the new machine has time in the future, that will work ok w/ hasCycled - */ - if (System.currentTimeMillis() < creationTime) { - creationTime = System.currentTimeMillis(); - } - - } - - public CleanUp(String tableId, String namespaceId) { - this.tableId = tableId; - this.namespaceId = namespaceId; - creationTime = System.currentTimeMillis(); - } - - @Override - public long isReady(long tid, Master master) throws Exception { - if (!master.hasCycled(creationTime)) { - return 50; - } - - boolean done = true; - Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange(); - Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); - MetaDataTableScanner.configureScanner(scanner, master); - scanner.setRange(tableRange); - - for (Entry<Key,Value> entry : scanner) { - TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue()); - TabletState state = locationState.getState(master.onlineTabletServers()); - if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) { - log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState); - done = false; - break; - } - } - - if (!done) - return 50; - - return 0; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - - master.clearMigrations(tableId); - - int refCount = 0; - - try { - // look for other tables that references this table's files - Connector conn = master.getConnector(); - BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8); - try { - Range allTables = MetadataSchema.TabletsSection.getRange(); - Range tableRange = MetadataSchema.TabletsSection.getRange(tableId); - Range beforeTable = new Range(allTables.getStartKey(), true, tableRange.getStartKey(), false); - Range afterTable = new Range(tableRange.getEndKey(), false, allTables.getEndKey(), true); - bs.setRanges(Arrays.asList(beforeTable, afterTable)); - bs.fetchColumnFamily(DataFileColumnFamily.NAME); - IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class); - GrepIterator.setTerm(cfg, "/" + tableId + "/"); - bs.addScanIterator(cfg); - - for (Entry<Key,Value> entry : bs) { - if (entry.getKey().getColumnQualifier().toString().contains("/" + tableId + "/")) { - refCount++; - } - } - } finally { - bs.close(); - } - - } catch (Exception e) { - refCount = -1; - log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e); - } - - // remove metadata table entries - try { - // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself. - // If the master lock passed to deleteTable, it is possible that the delete mutations will be dropped. If the delete operations - // are dropped and the operation completes, then the deletes will not be repeated. - MetadataTableUtil.deleteTable(tableId, refCount != 0, master, null); - } catch (Exception e) { - log.error("error deleting " + tableId + " from metadata table", e); - } - - // remove any problem reports the table may have - try { - ProblemReports.getInstance(master).deleteProblemReports(tableId); - } catch (Exception e) { - log.error("Failed to delete problem reports for table " + tableId, e); - } - - if (refCount == 0) { - final AccumuloConfiguration conf = master.getConfiguration(); - boolean archiveFiles = conf.getBoolean(Property.GC_FILE_ARCHIVE); - - // delete the map files - try { - VolumeManager fs = master.getFileSystem(); - for (String dir : ServerConstants.getTablesDirs()) { - if (archiveFiles) { - archiveFile(fs, dir, tableId); - } else { - fs.deleteRecursively(new Path(dir, tableId)); - } - } - } catch (IOException e) { - log.error("Unable to remove deleted table directory", e); - } catch (IllegalArgumentException exception) { - if (exception.getCause() instanceof UnknownHostException) { - /* Thrown if HDFS encounters a DNS problem in some edge cases */ - log.error("Unable to remove deleted table directory", exception); - } else { - throw exception; - } - } - } - - // remove table from zookeeper - try { - TableManager.getInstance().removeTable(tableId); - Tables.clearCache(master.getInstance()); - } catch (Exception e) { - log.error("Failed to find table id in zookeeper", e); - } - - // remove any permissions associated with this table - try { - AuditedSecurityOperation.getInstance(master).deleteTable(master.rpcCreds(), tableId, namespaceId); - } catch (ThriftSecurityException e) { - log.error("{}", e.getMessage(), e); - } - - Utils.unreserveTable(tableId, tid, true); - Utils.unreserveNamespace(namespaceId, tid, false); - - LoggerFactory.getLogger(CleanUp.class).debug("Deleted table " + tableId); - - return null; - } - - protected void archiveFile(VolumeManager fs, String dir, String tableId) throws IOException { - Path tableDirectory = new Path(dir, tableId); - Volume v = fs.getVolumeByPath(tableDirectory); - String basePath = v.getBasePath(); - - // Path component of URI - String tableDirPath = tableDirectory.toUri().getPath(); - - // Just the suffix of the path (after the Volume's base path) - String tableDirSuffix = tableDirPath.substring(basePath.length()); - - // Remove a leading path separator char because Path will treat the "child" as an absolute path with it - if (Path.SEPARATOR_CHAR == tableDirSuffix.charAt(0)) { - if (tableDirSuffix.length() > 1) { - tableDirSuffix = tableDirSuffix.substring(1); - } else { - tableDirSuffix = ""; - } - } - - // Get the file archive directory on this volume - final Path fileArchiveDir = new Path(basePath, ServerConstants.FILE_ARCHIVE_DIR); - - // Make sure it exists just to be safe - fs.mkdirs(fileArchiveDir); - - // The destination to archive this table to - final Path destTableDir = new Path(fileArchiveDir, tableDirSuffix); - - log.debug("Archiving " + tableDirectory + " to " + tableDirectory); - - if (fs.exists(destTableDir)) { - merge(fs, tableDirectory, destTableDir); - } else { - fs.rename(tableDirectory, destTableDir); - } - } - - protected void merge(VolumeManager fs, Path src, Path dest) throws IOException { - for (FileStatus child : fs.listStatus(src)) { - final String childName = child.getPath().getName(); - final Path childInSrc = new Path(src, childName), childInDest = new Path(dest, childName); - - if (child.isFile()) { - if (fs.exists(childInDest)) { - log.warn("File already exists in archive, ignoring. " + childInDest); - } else { - fs.rename(childInSrc, childInDest); - } - } else if (child.isDirectory()) { - if (fs.exists(childInDest)) { - // Recurse - merge(fs, childInSrc, childInDest); - } else { - fs.rename(childInSrc, childInDest); - } - } else { - // Symlinks shouldn't exist in table directories.. - log.warn("Ignoring archiving of non file/directory: " + child); - } - } - } - - @Override - public void undo(long tid, Master environment) throws Exception { - // nothing to do - } - -} public class DeleteTable extends MasterRepo {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportInfo.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportInfo.java new file mode 100644 index 0000000..d8f276a --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportInfo.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.Serializable; + +class ExportInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + public String tableName; + public String tableID; + public String exportDir; + public String namespaceID; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java index e5b7e86..cd50a18 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ExportTable.java @@ -16,268 +16,11 @@ */ package org.apache.accumulo.master.tableOps; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedOutputStream; -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.Tables; -import org.apache.accumulo.core.client.impl.thrift.TableOperation; -import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.DefaultConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; -import org.apache.accumulo.server.AccumuloServerContext; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; - -class ExportInfo implements Serializable { - - private static final long serialVersionUID = 1L; - - public String tableName; - public String tableID; - public String exportDir; - public String namespaceID; -} - -class WriteExportFiles extends MasterRepo { - - private static final long serialVersionUID = 1L; - private final ExportInfo tableInfo; - - WriteExportFiles(ExportInfo tableInfo) { - this.tableInfo = tableInfo; - } - - private void checkOffline(Connector conn) throws Exception { - if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) { - Tables.clearCache(conn.getInstance()); - if (Tables.getTableState(conn.getInstance(), tableInfo.tableID) != TableState.OFFLINE) { - throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER, - "Table is not offline"); - } - } - } - - @Override - public long isReady(long tid, Master master) throws Exception { - - long reserved = Utils.reserveNamespace(tableInfo.namespaceID, tid, false, true, TableOperation.EXPORT) - + Utils.reserveTable(tableInfo.tableID, tid, false, true, TableOperation.EXPORT); - if (reserved > 0) - return reserved; - - Connector conn = master.getConnector(); - - checkOffline(conn); - - Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - metaScanner.setRange(new KeyExtent(new Text(tableInfo.tableID), null, null).toMetadataRange()); - - // scan for locations - metaScanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); - metaScanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME); - - if (metaScanner.iterator().hasNext()) { - return 500; - } - - // use the same range to check for walogs that we used to check for hosted (or future hosted) tablets - // this is done as a separate scan after we check for locations, because walogs are okay only if there is no location - metaScanner.clearColumns(); - metaScanner.fetchColumnFamily(LogColumnFamily.NAME); - - if (metaScanner.iterator().hasNext()) { - throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER, - "Write ahead logs found for table"); - } - - return 0; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - try { - exportTable(master.getFileSystem(), master, tableInfo.tableName, tableInfo.tableID, tableInfo.exportDir); - } catch (IOException ioe) { - throw new ThriftTableOperationException(tableInfo.tableID, tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER, - "Failed to create export files " + ioe.getMessage()); - } - Utils.unreserveNamespace(tableInfo.namespaceID, tid, false); - Utils.unreserveTable(tableInfo.tableID, tid, false); - Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid); - return null; - } - - @Override - public void undo(long tid, Master env) throws Exception { - Utils.unreserveNamespace(tableInfo.namespaceID, tid, false); - Utils.unreserveTable(tableInfo.tableID, tid, false); - } - - public static void exportTable(VolumeManager fs, AccumuloServerContext context, String tableName, String tableID, String exportDir) throws Exception { - - fs.mkdirs(new Path(exportDir)); - Path exportMetaFilePath = fs.getVolumeByPath(new Path(exportDir)).getFileSystem().makeQualified(new Path(exportDir, Constants.EXPORT_FILE)); - - FSDataOutputStream fileOut = fs.create(exportMetaFilePath, false); - ZipOutputStream zipOut = new ZipOutputStream(fileOut); - BufferedOutputStream bufOut = new BufferedOutputStream(zipOut); - DataOutputStream dataOut = new DataOutputStream(bufOut); - - try { - - zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_INFO_FILE)); - OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8); - osw.append(ExportTable.EXPORT_VERSION_PROP + ":" + ExportTable.VERSION + "\n"); - osw.append("srcInstanceName:" + context.getInstance().getInstanceName() + "\n"); - osw.append("srcInstanceID:" + context.getInstance().getInstanceID() + "\n"); - osw.append("srcZookeepers:" + context.getInstance().getZooKeepers() + "\n"); - osw.append("srcTableName:" + tableName + "\n"); - osw.append("srcTableID:" + tableID + "\n"); - osw.append(ExportTable.DATA_VERSION_PROP + ":" + ServerConstants.DATA_VERSION + "\n"); - osw.append("srcCodeVersion:" + Constants.VERSION + "\n"); - - osw.flush(); - dataOut.flush(); - - exportConfig(context, tableID, zipOut, dataOut); - dataOut.flush(); - - Map<String,String> uniqueFiles = exportMetadata(fs, context, tableID, zipOut, dataOut); - - dataOut.close(); - dataOut = null; - - createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles); - - } finally { - if (dataOut != null) - dataOut.close(); - } - } - - private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException { - BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false), UTF_8)); - - try { - for (String file : uniqueFiles.values()) { - distcpOut.append(file); - distcpOut.newLine(); - } - - distcpOut.append(exportMetaFilePath.toString()); - distcpOut.newLine(); - - distcpOut.close(); - distcpOut = null; - - } finally { - if (distcpOut != null) - distcpOut.close(); - } - } - - private static Map<String,String> exportMetadata(VolumeManager fs, AccumuloServerContext context, String tableID, ZipOutputStream zipOut, - DataOutputStream dataOut) throws IOException, TableNotFoundException, AccumuloException, AccumuloSecurityException { - zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE)); - - Map<String,String> uniqueFiles = new HashMap<String,String>(); - - Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); - metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME); - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(metaScanner); - TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(metaScanner); - metaScanner.setRange(new KeyExtent(new Text(tableID), null, null).toMetadataRange()); - - for (Entry<Key,Value> entry : metaScanner) { - entry.getKey().write(dataOut); - entry.getValue().write(dataOut); - - if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { - String path = fs.getFullPath(entry.getKey()).toString(); - String tokens[] = path.split("/"); - if (tokens.length < 1) { - throw new RuntimeException("Illegal path " + path); - } - - String filename = tokens[tokens.length - 1]; - - String existingPath = uniqueFiles.get(filename); - if (existingPath == null) { - uniqueFiles.put(filename, path); - } else if (!existingPath.equals(path)) { - // make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier - throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table."); - } - - } - } - return uniqueFiles; - } - - private static void exportConfig(AccumuloServerContext context, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws AccumuloException, - AccumuloSecurityException, TableNotFoundException, IOException { - Connector conn = context.getConnector(); - - DefaultConfiguration defaultConfig = AccumuloConfiguration.getDefaultConfiguration(); - Map<String,String> siteConfig = conn.instanceOperations().getSiteConfiguration(); - Map<String,String> systemConfig = conn.instanceOperations().getSystemConfiguration(); - - TableConfiguration tableConfig = context.getServerConfigurationFactory().getTableConfiguration(tableID); - - OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8); - - // only put props that are different than defaults and higher level configurations - zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_TABLE_CONFIG_FILE)); - for (Entry<String,String> prop : tableConfig) { - if (prop.getKey().startsWith(Property.TABLE_PREFIX.getKey())) { - Property key = Property.getPropertyByKey(prop.getKey()); - - if (key == null || !defaultConfig.get(key).equals(prop.getValue())) { - if (!prop.getValue().equals(siteConfig.get(prop.getKey())) && !prop.getValue().equals(systemConfig.get(prop.getKey()))) { - osw.append(prop.getKey() + "=" + prop.getValue() + "\n"); - } - } - } - } - - osw.flush(); - } -} public class ExportTable extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java new file mode 100644 index 0000000..a502a3d --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; + +class FinishCancelCompaction extends MasterRepo { + private static final long serialVersionUID = 1L; + private String tableId; + + public FinishCancelCompaction(String tableId) { + this.tableId = tableId; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + Utils.getReadLock(tableId, tid).unlock(); + return null; + } + + @Override + public void undo(long tid, Master environment) throws Exception { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCloneTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCloneTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCloneTable.java new file mode 100644 index 0000000..7c3701b --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCloneTable.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.tables.TableManager; +import org.slf4j.LoggerFactory; + +class FinishCloneTable extends MasterRepo { + + private static final long serialVersionUID = 1L; + private CloneInfo cloneInfo; + + public FinishCloneTable(CloneInfo cloneInfo) { + this.cloneInfo = cloneInfo; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + // directories are intentionally not created.... this is done because directories should be unique + // because they occupy a different namespace than normal tablet directories... also some clones + // may never create files.. therefore there is no need to consume namenode space w/ directories + // that are not used... tablet will create directories as needed + + TableManager.getInstance().transitionTableState(cloneInfo.tableId, TableState.ONLINE); + + Utils.unreserveNamespace(cloneInfo.srcNamespaceId, tid, false); + if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) + Utils.unreserveNamespace(cloneInfo.namespaceId, tid, false); + Utils.unreserveTable(cloneInfo.srcTableId, tid, false); + Utils.unreserveTable(cloneInfo.tableId, tid, true); + + environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName, cloneInfo.srcTableId); + + LoggerFactory.getLogger(FinishCloneTable.class).debug("Cloned table " + cloneInfo.srcTableId + " " + cloneInfo.tableId + " " + cloneInfo.tableName); + + return null; + } + + @Override + public void undo(long tid, Master environment) throws Exception {} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateNamespace.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateNamespace.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateNamespace.java new file mode 100644 index 0000000..93cc194 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateNamespace.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.slf4j.LoggerFactory; + +class FinishCreateNamespace extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private NamespaceInfo namespaceInfo; + + public FinishCreateNamespace(NamespaceInfo ti) { + this.namespaceInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long id, Master env) throws Exception { + + Utils.unreserveNamespace(namespaceInfo.namespaceId, id, true); + + env.getEventCoordinator().event("Created namespace %s ", namespaceInfo.namespaceName); + + LoggerFactory.getLogger(FinishCreateNamespace.class).debug("Created table " + namespaceInfo.namespaceId + " " + namespaceInfo.namespaceName); + + return null; + } + + @Override + public String getReturn() { + return namespaceInfo.namespaceId; + } + + @Override + public void undo(long tid, Master env) throws Exception {} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateTable.java new file mode 100644 index 0000000..2343efb --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCreateTable.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.tables.TableManager; +import org.slf4j.LoggerFactory; + +class FinishCreateTable extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private TableInfo tableInfo; + + public FinishCreateTable(TableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master env) throws Exception { + TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE); + + Utils.unreserveNamespace(tableInfo.namespaceId, tid, false); + Utils.unreserveTable(tableInfo.tableId, tid, true); + + env.getEventCoordinator().event("Created table %s ", tableInfo.tableName); + + LoggerFactory.getLogger(FinishCreateTable.class).debug("Created table " + tableInfo.tableId + " " + tableInfo.tableName); + + return null; + } + + @Override + public String getReturn() { + return tableInfo.tableId; + } + + @Override + public void undo(long tid, Master env) throws Exception {} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishImportTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishImportTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishImportTable.java new file mode 100644 index 0000000..7dd76b1 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishImportTable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.hadoop.fs.Path; +import org.slf4j.LoggerFactory; + +class FinishImportTable extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + public FinishImportTable(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master env) throws Exception { + + env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt")); + + TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE); + + Utils.unreserveNamespace(tableInfo.namespaceId, tid, false); + Utils.unreserveTable(tableInfo.tableId, tid, true); + + Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid); + + env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName); + + LoggerFactory.getLogger(FinishImportTable.class).debug("Imported table " + tableInfo.tableId + " " + tableInfo.tableName); + + return null; + } + + @Override + public String getReturn() { + return tableInfo.tableId; + } + + @Override + public void undo(long tid, Master env) throws Exception {} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportPopulateZookeeper.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportPopulateZookeeper.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportPopulateZookeeper.java new file mode 100644 index 0000000..f436fd3 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportPopulateZookeeper.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.TableOperationsImpl; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.util.TablePropUtil; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +class ImportPopulateZookeeper extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + ImportPopulateZookeeper(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT); + } + + private Map<String,String> getExportedProps(VolumeManager fs) throws Exception { + + Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE); + + try { + FileSystem ns = fs.getVolumeByPath(path).getFileSystem(); + return TableOperationsImpl.getExportedProps(ns, path); + } catch (IOException ioe) { + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "Error reading table props from " + path + " " + ioe.getMessage()); + } + } + + @Override + public Repo<Master> call(long tid, Master env) throws Exception { + // reserve the table name in zookeeper or fail + + Utils.tableNameLock.lock(); + try { + // write tableName & tableId to zookeeper + Instance instance = env.getInstance(); + + Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE); + + String namespace = Tables.qualify(tableInfo.tableName).getFirst(); + String namespaceId = Namespaces.getNamespaceId(instance, namespace); + TableManager.getInstance().addTable(tableInfo.tableId, namespaceId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE); + + Tables.clearCache(instance); + } finally { + Utils.tableNameLock.unlock(); + } + + for (Entry<String,String> entry : getExportedProps(env.getFileSystem()).entrySet()) + if (!TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue())) { + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "Invalid table property " + entry.getKey()); + } + + return new CreateImportDir(tableInfo); + } + + @Override + public void undo(long tid, Master env) throws Exception { + Instance instance = env.getInstance(); + TableManager.getInstance().removeTable(tableInfo.tableId); + Utils.unreserveTable(tableInfo.tableId, tid, true); + Tables.clearCache(instance); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportSetupPermissions.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportSetupPermissions.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportSetupPermissions.java new file mode 100644 index 0000000..00fade9 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportSetupPermissions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.slf4j.LoggerFactory; + +class ImportSetupPermissions extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + public ImportSetupPermissions(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long tid, Master env) throws Exception { + // give all table permissions to the creator + SecurityOperation security = AuditedSecurityOperation.getInstance(env); + for (TablePermission permission : TablePermission.values()) { + try { + security.grantTablePermission(env.rpcCreds(), tableInfo.user, tableInfo.tableId, permission, tableInfo.namespaceId); + } catch (ThriftSecurityException e) { + LoggerFactory.getLogger(ImportSetupPermissions.class).error("{}", e.getMessage(), e); + throw e; + } + } + + // setup permissions in zookeeper before table info in zookeeper + // this way concurrent users will not get a spurious permission denied + // error + return new ImportPopulateZookeeper(tableInfo); + } + + @Override + public void undo(long tid, Master env) throws Exception { + AuditedSecurityOperation.getInstance(env).deleteTable(env.rpcCreds(), tableInfo.tableId, tableInfo.namespaceId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java index 31bc52c..a90474f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportTable.java @@ -18,542 +18,21 @@ package org.apache.accumulo.master.tableOps; import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.BufferedInputStream; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.impl.Namespaces; -import org.apache.accumulo.core.client.impl.TableOperationsImpl; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.fate.Repo; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.security.AuditedSecurityOperation; -import org.apache.accumulo.server.security.SecurityOperation; -import org.apache.accumulo.server.tables.TableManager; -import org.apache.accumulo.server.tablets.UniqueNameAllocator; -import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.server.util.TablePropUtil; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; - -/** - * - */ -class ImportedTableInfo implements Serializable { - - private static final long serialVersionUID = 1L; - - public String exportDir; - public String user; - public String tableName; - public String tableId; - public String importDir; - public String namespaceId; -} - -class FinishImportTable extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - public FinishImportTable(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master env) throws Exception { - - env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt")); - - TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE); - - Utils.unreserveNamespace(tableInfo.namespaceId, tid, false); - Utils.unreserveTable(tableInfo.tableId, tid, true); - - Utils.unreserveHdfsDirectory(new Path(tableInfo.exportDir).toString(), tid); - - env.getEventCoordinator().event("Imported table %s ", tableInfo.tableName); - - LoggerFactory.getLogger(FinishImportTable.class).debug("Imported table " + tableInfo.tableId + " " + tableInfo.tableName); - - return null; - } - - @Override - public String getReturn() { - return tableInfo.tableId; - } - - @Override - public void undo(long tid, Master env) throws Exception {} - -} - -class MoveExportedFiles extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - MoveExportedFiles(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - try { - VolumeManager fs = master.getFileSystem(); - - Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo); - - for (String oldFileName : fileNameMappings.keySet()) { - if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) { - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "File referenced by exported table does not exists " + oldFileName); - } - } - - FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir)); - - for (FileStatus fileStatus : files) { - String newName = fileNameMappings.get(fileStatus.getPath().getName()); - - if (newName != null) - fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, newName)); - } - - return new FinishImportTable(tableInfo); - } catch (IOException ioe) { - log.warn("{}", ioe.getMessage(), ioe); - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "Error renaming files " + ioe.getMessage()); - } - } -} - -class PopulateMetadataTable extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - PopulateMetadataTable(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception { - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt")), UTF_8)); - - try { - Map<String,String> map = new HashMap<String,String>(); - - String line = null; - while ((line = in.readLine()) != null) { - String sa[] = line.split(":", 2); - map.put(sa[0], sa[1]); - } - - return map; - } finally { - in.close(); - } - - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - - Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE); - - BatchWriter mbw = null; - ZipInputStream zis = null; - - try { - VolumeManager fs = master.getFileSystem(); - - mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - - zis = new ZipInputStream(fs.open(path)); - - Map<String,String> fileNameMappings = readMappingFile(fs, tableInfo); - - log.info("importDir is " + tableInfo.importDir); - - // This is a directory already prefixed with proper volume information e.g. hdfs://localhost:8020/path/to/accumulo/tables/... - final String bulkDir = tableInfo.importDir; - - final String[] tableDirs = ServerConstants.getTablesDirs(); - - ZipEntry zipEntry; - while ((zipEntry = zis.getNextEntry()) != null) { - if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) { - DataInputStream in = new DataInputStream(new BufferedInputStream(zis)); - - Key key = new Key(); - Value val = new Value(); - - Mutation m = null; - Text currentRow = null; - int dirCount = 0; - - while (true) { - key.readFields(in); - val.readFields(in); - - Text endRow = new KeyExtent(key.getRow(), (Text) null).getEndRow(); - Text metadataRow = new KeyExtent(new Text(tableInfo.tableId), endRow, null).getMetadataEntry(); - - Text cq; - - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - String oldName = new Path(key.getColumnQualifier().toString()).getName(); - String newName = fileNameMappings.get(oldName); - - if (newName == null) { - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "File " + oldName + " does not exist in import dir"); - } - - cq = new Text(bulkDir + "/" + newName); - } else { - cq = key.getColumnQualifier(); - } - - if (m == null) { - // Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator - String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8); - - // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX - String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir); - - m = new Mutation(metadataRow); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8))); - currentRow = metadataRow; - } - - if (!currentRow.equals(metadataRow)) { - mbw.addMutation(m); - - // Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator - String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8); - - // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX - String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir); - - m = new Mutation(metadataRow); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8))); - } - - m.put(key.getColumnFamily(), cq, val); - - if (endRow == null && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - mbw.addMutation(m); - break; // its the last column in the last row - } - } - - break; - } - } - - return new MoveExportedFiles(tableInfo); - } catch (IOException ioe) { - log.warn("{}", ioe.getMessage(), ioe); - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "Error reading " + path + " " + ioe.getMessage()); - } finally { - if (zis != null) { - try { - zis.close(); - } catch (IOException ioe) { - log.warn("Failed to close zip file ", ioe); - } - } - - if (mbw != null) { - mbw.close(); - } - } - } - - /** - * Given options for tables (across multiple volumes), construct an absolute path using the unique name within the chosen volume - * - * @return An absolute, unique path for the imported table - */ - protected String getClonedTabletDir(Master master, String[] tableDirs, String tabletDir) { - // We can try to spread out the tablet dirs across all volumes - String tableDir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), tableDirs); - - // Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX - return tableDir + "/" + tableInfo.tableId + "/" + tabletDir; - } - - @Override - public void undo(long tid, Master environment) throws Exception { - MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment, environment.getMasterLock()); - } -} - -class MapImportFileNames extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - MapImportFileNames(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - @Override - public Repo<Master> call(long tid, Master environment) throws Exception { - - Path path = new Path(tableInfo.importDir, "mappings.txt"); - - BufferedWriter mappingsWriter = null; - - try { - VolumeManager fs = environment.getFileSystem(); - - fs.mkdirs(new Path(tableInfo.importDir)); - - FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir)); - - UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); - - mappingsWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path), UTF_8)); - - for (FileStatus fileStatus : files) { - String fileName = fileStatus.getPath().getName(); - log.info("filename " + fileStatus.getPath().toString()); - String sa[] = fileName.split("\\."); - String extension = ""; - if (sa.length > 1) { - extension = sa[sa.length - 1]; - - if (!FileOperations.getValidExtensions().contains(extension)) { - continue; - } - } else { - // assume it is a map file - extension = Constants.MAPFILE_EXTENSION; - } - - String newName = "I" + namer.getNextName() + "." + extension; - - mappingsWriter.append(fileName); - mappingsWriter.append(':'); - mappingsWriter.append(newName); - mappingsWriter.newLine(); - } - - mappingsWriter.close(); - mappingsWriter = null; - - return new PopulateMetadataTable(tableInfo); - } catch (IOException ioe) { - log.warn("{}", ioe.getMessage(), ioe); - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "Error writing mapping file " + path + " " + ioe.getMessage()); - } finally { - if (mappingsWriter != null) - try { - mappingsWriter.close(); - } catch (IOException ioe) { - log.warn("Failed to close " + path, ioe); - } - } - } - - @Override - public void undo(long tid, Master env) throws Exception { - env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir)); - } -} - -class CreateImportDir extends MasterRepo { - private static final Logger log = LoggerFactory.getLogger(CreateImportDir.class); - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - CreateImportDir(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - - UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); - - Path exportDir = new Path(tableInfo.exportDir); - String[] tableDirs = ServerConstants.getTablesDirs(); - - log.info("Looking for matching filesystem for " + exportDir + " from options " + Arrays.toString(tableDirs)); - Path base = master.getFileSystem().matchingFileSystem(exportDir, tableDirs); - log.info("Chose base table directory of " + base); - Path directory = new Path(base, tableInfo.tableId); - - Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName()); - - tableInfo.importDir = newBulkDir.toString(); - - log.info("Using import dir: " + tableInfo.importDir); - - return new MapImportFileNames(tableInfo); - } -} - -class ImportPopulateZookeeper extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - ImportPopulateZookeeper(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT); - } - - private Map<String,String> getExportedProps(VolumeManager fs) throws Exception { - - Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE); - - try { - FileSystem ns = fs.getVolumeByPath(path).getFileSystem(); - return TableOperationsImpl.getExportedProps(ns, path); - } catch (IOException ioe) { - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "Error reading table props from " + path + " " + ioe.getMessage()); - } - } - - @Override - public Repo<Master> call(long tid, Master env) throws Exception { - // reserve the table name in zookeeper or fail - - Utils.tableNameLock.lock(); - try { - // write tableName & tableId to zookeeper - Instance instance = env.getInstance(); - - Utils.checkTableDoesNotExist(instance, tableInfo.tableName, tableInfo.tableId, TableOperation.CREATE); - - String namespace = Tables.qualify(tableInfo.tableName).getFirst(); - String namespaceId = Namespaces.getNamespaceId(instance, namespace); - TableManager.getInstance().addTable(tableInfo.tableId, namespaceId, tableInfo.tableName, NodeExistsPolicy.OVERWRITE); - - Tables.clearCache(instance); - } finally { - Utils.tableNameLock.unlock(); - } - - for (Entry<String,String> entry : getExportedProps(env.getFileSystem()).entrySet()) - if (!TablePropUtil.setTableProperty(tableInfo.tableId, entry.getKey(), entry.getValue())) { - throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "Invalid table property " + entry.getKey()); - } - - return new CreateImportDir(tableInfo); - } - - @Override - public void undo(long tid, Master env) throws Exception { - Instance instance = env.getInstance(); - TableManager.getInstance().removeTable(tableInfo.tableId); - Utils.unreserveTable(tableInfo.tableId, tid, true); - Tables.clearCache(instance); - } -} - -class ImportSetupPermissions extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private ImportedTableInfo tableInfo; - - public ImportSetupPermissions(ImportedTableInfo ti) { - this.tableInfo = ti; - } - - @Override - public long isReady(long tid, Master environment) throws Exception { - return 0; - } - - @Override - public Repo<Master> call(long tid, Master env) throws Exception { - // give all table permissions to the creator - SecurityOperation security = AuditedSecurityOperation.getInstance(env); - for (TablePermission permission : TablePermission.values()) { - try { - security.grantTablePermission(env.rpcCreds(), tableInfo.user, tableInfo.tableId, permission, tableInfo.namespaceId); - } catch (ThriftSecurityException e) { - LoggerFactory.getLogger(ImportSetupPermissions.class).error("{}", e.getMessage(), e); - throw e; - } - } - - // setup permissions in zookeeper before table info in zookeeper - // this way concurrent users will not get a spurious permission denied - // error - return new ImportPopulateZookeeper(tableInfo); - } - - @Override - public void undo(long tid, Master env) throws Exception { - AuditedSecurityOperation.getInstance(env).deleteTable(env.rpcCreds(), tableInfo.tableId, tableInfo.namespaceId); - } -} public class ImportTable extends MasterRepo { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportedTableInfo.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportedTableInfo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportedTableInfo.java new file mode 100644 index 0000000..34bb6c8 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/ImportedTableInfo.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.Serializable; + +class ImportedTableInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + public String exportDir; + public String user; + public String tableName; + public String tableId; + public String importDir; + public String namespaceId; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java new file mode 100644 index 0000000..c478a5d --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.accumulo.core.client.impl.ServerClient; +import org.apache.accumulo.core.client.impl.thrift.ClientService; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.client.impl.thrift.ClientService.Client; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.htrace.wrappers.TraceExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class LoadFiles extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private static ExecutorService threadPool = null; + private static final Logger log = LoggerFactory.getLogger(BulkImport.class); + + private String tableId; + private String source; + private String bulk; + private String errorDir; + private boolean setTime; + + public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.errorDir = errorDir; + this.setTime = setTime; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + if (master.onlineTabletServers().size() == 0) + return 500; + return 0; + } + + private static synchronized ExecutorService getThreadPool(Master master) { + if (threadPool == null) { + int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); + ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); + pool.allowCoreThreadTimeOut(true); + threadPool = new TraceExecutorService(pool); + } + return threadPool; + } + + @Override + public Repo<Master> call(final long tid, final Master master) throws Exception { + ExecutorService executor = getThreadPool(master); + final AccumuloConfiguration conf = master.getConfiguration(); + VolumeManager fs = master.getFileSystem(); + List<FileStatus> files = new ArrayList<FileStatus>(); + for (FileStatus entry : fs.listStatus(new Path(bulk))) { + files.add(entry); + } + log.debug("tid " + tid + " importing " + files.size() + " files"); + + Path writable = new Path(this.errorDir, ".iswritable"); + if (!fs.createNewFile(writable)) { + // Maybe this is a re-try... clear the flag and try again + fs.delete(writable); + if (!fs.createNewFile(writable)) + throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, + "Unable to write to " + this.errorDir); + } + fs.delete(writable); + + final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); + for (FileStatus f : files) + filesToLoad.add(f.getPath().toString()); + + final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); + for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { + List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); + + if (master.onlineTabletServers().size() == 0) + log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); + + while (master.onlineTabletServers().size() == 0) { + UtilWaitThread.sleep(500); + } + + // Use the threadpool to assign files one-at-a-time to the server + final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); + for (final String file : filesToLoad) { + results.add(executor.submit(new Callable<List<String>>() { + @Override + public List<String> call() { + List<String> failures = new ArrayList<String>(); + ClientService.Client client = null; + String server = null; + try { + // get a connection to a random tablet server, do not prefer cached connections because + // this is running on the master and there are lots of connections to tablet servers + // serving the metadata tablets + long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); + Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); + client = pair.getSecond(); + server = pair.getFirst(); + List<String> attempt = Collections.singletonList(file); + log.debug("Asking " + pair.getFirst() + " to bulk import " + file); + List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime); + if (fail.isEmpty()) { + loaded.add(file); + } else { + failures.addAll(fail); + } + } catch (Exception ex) { + log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); + } finally { + ServerClient.close(client); + } + return failures; + } + })); + } + Set<String> failures = new HashSet<String>(); + for (Future<List<String>> f : results) + failures.addAll(f.get()); + filesToLoad.removeAll(loaded); + if (filesToLoad.size() > 0) { + log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); + UtilWaitThread.sleep(100); + } + } + + FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); + try { + for (String f : filesToLoad) { + out.write(f); + out.write("\n"); + } + } finally { + out.close(); + } + + // return the next step, which will perform cleanup + return new CompleteBulkImport(tableId, source, bulk, errorDir); + } + + static String sampleList(Collection<?> potentiallyLongList, int max) { + StringBuffer result = new StringBuffer(); + result.append("["); + int i = 0; + for (Object obj : potentiallyLongList) { + result.append(obj); + if (i >= max) { + result.append("..."); + break; + } else { + result.append(", "); + } + i++; + } + if (i < max) + result.delete(result.length() - 2, result.length()); + result.append("]"); + return result.toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/MapImportFileNames.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MapImportFileNames.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MapImportFileNames.java new file mode 100644 index 0000000..0ee91dd --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MapImportFileNames.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.tablets.UniqueNameAllocator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +class MapImportFileNames extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + MapImportFileNames(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + + Path path = new Path(tableInfo.importDir, "mappings.txt"); + + BufferedWriter mappingsWriter = null; + + try { + VolumeManager fs = environment.getFileSystem(); + + fs.mkdirs(new Path(tableInfo.importDir)); + + FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir)); + + UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); + + mappingsWriter = new BufferedWriter(new OutputStreamWriter(fs.create(path), UTF_8)); + + for (FileStatus fileStatus : files) { + String fileName = fileStatus.getPath().getName(); + log.info("filename " + fileStatus.getPath().toString()); + String sa[] = fileName.split("\\."); + String extension = ""; + if (sa.length > 1) { + extension = sa[sa.length - 1]; + + if (!FileOperations.getValidExtensions().contains(extension)) { + continue; + } + } else { + // assume it is a map file + extension = Constants.MAPFILE_EXTENSION; + } + + String newName = "I" + namer.getNextName() + "." + extension; + + mappingsWriter.append(fileName); + mappingsWriter.append(':'); + mappingsWriter.append(newName); + mappingsWriter.newLine(); + } + + mappingsWriter.close(); + mappingsWriter = null; + + return new PopulateMetadataTable(tableInfo); + } catch (IOException ioe) { + log.warn("{}", ioe.getMessage(), ioe); + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "Error writing mapping file " + path + " " + ioe.getMessage()); + } finally { + if (mappingsWriter != null) + try { + mappingsWriter.close(); + } catch (IOException ioe) { + log.warn("Failed to close " + path, ioe); + } + } + } + + @Override + public void undo(long tid, Master env) throws Exception { + env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java new file mode 100644 index 0000000..19395df --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +class MoveExportedFiles extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private ImportedTableInfo tableInfo; + + MoveExportedFiles(ImportedTableInfo ti) { + this.tableInfo = ti; + } + + @Override + public Repo<Master> call(long tid, Master master) throws Exception { + try { + VolumeManager fs = master.getFileSystem(); + + Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo); + + for (String oldFileName : fileNameMappings.keySet()) { + if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) { + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "File referenced by exported table does not exists " + oldFileName); + } + } + + FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir)); + + for (FileStatus fileStatus : files) { + String newName = fileNameMappings.get(fileStatus.getPath().getName()); + + if (newName != null) + fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, newName)); + } + + return new FinishImportTable(tableInfo); + } catch (IOException ioe) { + log.warn("{}", ioe.getMessage(), ioe); + throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "Error renaming files " + ioe.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e2e6780/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceCleanUp.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceCleanUp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceCleanUp.java new file mode 100644 index 0000000..2444374 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/NamespaceCleanUp.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.tables.TableManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class NamespaceCleanUp extends MasterRepo { + + private static final Logger log = LoggerFactory.getLogger(NamespaceCleanUp.class); + + private static final long serialVersionUID = 1L; + + private String namespaceId; + + public NamespaceCleanUp(String namespaceId) { + this.namespaceId = namespaceId; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + return 0; + } + + @Override + public Repo<Master> call(long id, Master master) throws Exception { + + // remove from zookeeper + try { + TableManager.getInstance().removeNamespace(namespaceId); + } catch (Exception e) { + log.error("Failed to find namespace in zookeeper", e); + } + Tables.clearCache(master.getInstance()); + + // remove any permissions associated with this namespace + try { + AuditedSecurityOperation.getInstance(master).deleteNamespace(master.rpcCreds(), namespaceId); + } catch (ThriftSecurityException e) { + log.error("{}", e.getMessage(), e); + } + + Utils.unreserveNamespace(namespaceId, id, true); + + log.debug("Deleted namespace " + namespaceId); + + return null; + } + + @Override + public void undo(long tid, Master environment) throws Exception { + // nothing to do + } + +}