Merge branch '1.6' into 1.7 Conflicts: core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0c3dc17e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0c3dc17e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0c3dc17e Branch: refs/heads/master Commit: 0c3dc17e905d9d470b33594a93a0e98d74ade989 Parents: d7e7c64 1ab3827 Author: Josh Elser <els...@apache.org> Authored: Sun Oct 18 17:46:19 2015 -0400 Committer: Josh Elser <els...@apache.org> Committed: Sun Oct 18 17:56:28 2015 -0400 ---------------------------------------------------------------------- .../accumulo/core/client/ClientConfiguration.java | 2 +- .../accumulo/master/tableOps/BulkImport.java | 1 - .../apache/accumulo/master/tableOps/LoadFiles.java | 2 +- .../org/apache/accumulo/test/ShellServerIT.java | 17 +++++++++-------- 4 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0c3dc17e/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java index dc15d98,8593d9e..10dbd5e --- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java @@@ -218,8 -146,8 +218,8 @@@ public class ClientConfiguration extend List<Configuration> configs = new LinkedList<Configuration>(); for (String path : paths) { File conf = new File(path); - if (conf.canRead()) { + if (conf.isFile() && conf.canRead()) { - configs.add(new PropertiesConfiguration(conf)); + configs.add(new ClientConfiguration(conf)); } } // We couldn't find the client configuration anywhere http://git-wip-us.apache.org/repos/asf/accumulo/blob/0c3dc17e/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index 7001fdd,37edbc9..ad20473 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@@ -266,4 -286,335 +266,3 @@@ public class BulkImport extends MasterR ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); } } - -class CleanUpBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static final Logger log = Logger.getLogger(CleanUpBulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String error; - - public CleanUpBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - log.debug("removing the bulk processing flag file in " + bulk); - Path bulkDir = new Path(bulk); - MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString()); - log.debug("removing the metadata table markers for loaded files"); - Connector conn = master.getConnector(); - MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid); - log.debug("releasing HDFS reservations for " + source + " and " + error); - Utils.unreserveHdfsDirectory(source, tid); - Utils.unreserveHdfsDirectory(error, tid); - Utils.getReadLock(tableId, tid).unlock(); - log.debug("completing bulk import transaction " + tid); - ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); - return null; - } -} - -class CompleteBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CompleteBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); - return new CopyFailed(tableId, source, bulk, error); - } -} - -class CopyFailed extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CopyFailed(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - Set<TServerInstance> finished = new HashSet<TServerInstance>(); - Set<TServerInstance> running = master.onlineTabletServers(); - for (TServerInstance server : running) { - try { - TServerConnection client = master.getConnection(server); - if (client != null && !client.isActive(tid)) - finished.add(server); - } catch (TException ex) { - log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); - } - } - if (finished.containsAll(running)) - return 0; - return 500; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - // This needs to execute after the arbiter is stopped - - VolumeManager fs = master.getFileSystem(); - - if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT))) - return new CleanUpBulkImport(tableId, source, bulk, error); - - HashMap<FileRef,String> failures = new HashMap<FileRef,String>(); - HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>(); - - FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT)); - BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8)); - try { - String line = null; - while ((line = in.readLine()) != null) { - Path path = new Path(line); - if (!fs.exists(new Path(error, path.getName()))) - failures.put(new FileRef(line, path), line); - } - } finally { - failFile.close(); - } - - /* - * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that - * have no loaded markers. - */ - - // determine which failed files were loaded - Connector conn = master.getConnector(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - - for (Entry<Key,Value> entry : mscanner) { - if (Long.parseLong(entry.getValue().toString()) == tid) { - FileRef loadedFile = new FileRef(fs, entry.getKey()); - String absPath = failures.remove(loadedFile); - if (absPath != null) { - loadedFailures.put(loadedFile, absPath); - } - } - } - - // move failed files that were not loaded - for (String failure : failures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - fs.rename(orig, dest); - log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed"); - } - - if (loadedFailures.size() > 0) { - DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() - + Constants.ZBULK_FAILED_COPYQ); - - HashSet<String> workIds = new HashSet<String>(); - - for (String failure : loadedFailures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - - if (fs.exists(dest)) - continue; - - bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8)); - workIds.add(orig.getName()); - log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); - } - - bifCopyQueue.waitUntilDone(workIds); - } - - fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT)); - return new CleanUpBulkImport(tableId, source, bulk, error); - } - -} - -class LoadFiles extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static ExecutorService threadPool = null; - private static final Logger log = Logger.getLogger(BulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String errorDir; - private boolean setTime; - - public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.errorDir = errorDir; - this.setTime = setTime; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - if (master.onlineTabletServers().size() == 0) - return 500; - return 0; - } - - private static synchronized ExecutorService getThreadPool(Master master) { - if (threadPool == null) { - int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); - pool.allowCoreThreadTimeOut(true); - threadPool = new TraceExecutorService(pool); - } - return threadPool; - } - - @Override - public Repo<Master> call(final long tid, final Master master) throws Exception { - ExecutorService executor = getThreadPool(master); - final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); - VolumeManager fs = master.getFileSystem(); - List<FileStatus> files = new ArrayList<FileStatus>(); - for (FileStatus entry : fs.listStatus(new Path(bulk))) { - files.add(entry); - } - log.debug("tid " + tid + " importing " + files.size() + " files"); - - Path writable = new Path(this.errorDir, ".iswritable"); - if (!fs.createNewFile(writable)) { - // Maybe this is a re-try... clear the flag and try again - fs.delete(writable); - if (!fs.createNewFile(writable)) - throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, - "Unable to write to " + this.errorDir); - } - fs.delete(writable); - - final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); - for (FileStatus f : files) - filesToLoad.add(f.getPath().toString()); - - final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); - for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { - List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); - - if (master.onlineTabletServers().size() == 0) - log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); - - while (master.onlineTabletServers().size() == 0) { - UtilWaitThread.sleep(500); - } - - // Use the threadpool to assign files one-at-a-time to the server - final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); - final TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]); - final Random random = new Random(); - for (final String file : filesToLoad) { - results.add(executor.submit(new Callable<List<String>>() { - @Override - public List<String> call() { - List<String> failures = new ArrayList<String>(); - Client client = null; - String server = null; - try { - // get a connection to a random tablet server, do not prefer cached connections because - // this is running on the master and there are lots of connections to tablet servers - // serving the metadata tablets - long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - server = servers[random.nextInt(servers.length)].getLocation().toString(); - client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis); - List<String> attempt = Collections.singletonList(file); - log.debug("Asking " + server + " to bulk import " + file); - List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt, - errorDir, setTime); - if (fail.isEmpty()) { - loaded.add(file); - } else { - failures.addAll(fail); - } - } catch (Exception ex) { - log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); - } finally { - ThriftUtil.returnClient(client); - } - return failures; - } - })); - } - Set<String> failures = new HashSet<String>(); - for (Future<List<String>> f : results) - failures.addAll(f.get()); - filesToLoad.removeAll(loaded); - if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); - UtilWaitThread.sleep(100); - } - } - - FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); - try { - for (String f : filesToLoad) { - out.write(f); - out.write("\n"); - } - } finally { - out.close(); - } - - // return the next step, which will perform cleanup - return new CompleteBulkImport(tableId, source, bulk, errorDir); - } - - static String sampleList(Collection<?> potentiallyLongList, int max) { - StringBuffer result = new StringBuffer(); - result.append("["); - int i = 0; - for (Object obj : potentiallyLongList) { - result.append(obj); - if (i >= max) { - result.append("..."); - break; - } else { - result.append(", "); - } - i++; - } - if (i < max) - result.delete(result.length() - 2, result.length()); - result.append("]"); - return result.toString(); - } -- -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0c3dc17e/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java index af5262a,0000000..d67ae0b mode 100644,000000..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java @@@ -1,213 -1,0 +1,213 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.accumulo.core.client.impl.thrift.ClientService; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.htrace.wrappers.TraceExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +class LoadFiles extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private static ExecutorService threadPool = null; + private static final Logger log = LoggerFactory.getLogger(LoadFiles.class); + + private String tableId; + private String source; + private String bulk; + private String errorDir; + private boolean setTime; + + public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.errorDir = errorDir; + this.setTime = setTime; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + if (master.onlineTabletServers().size() == 0) + return 500; + return 0; + } + + private static synchronized ExecutorService getThreadPool(Master master) { + if (threadPool == null) { + int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); + ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); + pool.allowCoreThreadTimeOut(true); + threadPool = new TraceExecutorService(pool); + } + return threadPool; + } + + @Override + public Repo<Master> call(final long tid, final Master master) throws Exception { + ExecutorService executor = getThreadPool(master); + final AccumuloConfiguration conf = master.getConfiguration(); + VolumeManager fs = master.getFileSystem(); + List<FileStatus> files = new ArrayList<FileStatus>(); + for (FileStatus entry : fs.listStatus(new Path(bulk))) { + files.add(entry); + } + log.debug("tid " + tid + " importing " + files.size() + " files"); + + Path writable = new Path(this.errorDir, ".iswritable"); + if (!fs.createNewFile(writable)) { + // Maybe this is a re-try... clear the flag and try again + fs.delete(writable); + if (!fs.createNewFile(writable)) + throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, + "Unable to write to " + this.errorDir); + } + fs.delete(writable); + + final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); + for (FileStatus f : files) + filesToLoad.add(f.getPath().toString()); + + final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); + for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { + List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); + + if (master.onlineTabletServers().size() == 0) + log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); + + while (master.onlineTabletServers().size() == 0) { + UtilWaitThread.sleep(500); + } + + // Use the threadpool to assign files one-at-a-time to the server + final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); + final Random random = new Random(); + final TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]); + for (final String file : filesToLoad) { + results.add(executor.submit(new Callable<List<String>>() { + @Override + public List<String> call() { + List<String> failures = new ArrayList<String>(); + ClientService.Client client = null; + HostAndPort server = null; + try { + // get a connection to a random tablet server, do not prefer cached connections because + // this is running on the master and there are lots of connections to tablet servers + // serving the metadata tablets + long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); + // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); + server = servers[random.nextInt(servers.length)].getLocation(); + client = ThriftUtil.getTServerClient(server, master, timeInMillis); + List<String> attempt = Collections.singletonList(file); + log.debug("Asking " + server + " to bulk import " + file); + List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime); + if (fail.isEmpty()) { + loaded.add(file); + } else { + failures.addAll(fail); + } + } catch (Exception ex) { + log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); + } finally { + ThriftUtil.returnClient(client); + } + return failures; + } + })); + } + Set<String> failures = new HashSet<String>(); + for (Future<List<String>> f : results) + failures.addAll(f.get()); + filesToLoad.removeAll(loaded); + if (filesToLoad.size() > 0) { + log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); + UtilWaitThread.sleep(100); + } + } + + FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); + try { + for (String f : filesToLoad) { + out.write(f); + out.write("\n"); + } + } finally { + out.close(); + } + + // return the next step, which will perform cleanup + return new CompleteBulkImport(tableId, source, bulk, errorDir); + } + + static String sampleList(Collection<?> potentiallyLongList, int max) { + StringBuffer result = new StringBuffer(); + result.append("["); + int i = 0; + for (Object obj : potentiallyLongList) { + result.append(obj); + if (i >= max) { + result.append("..."); + break; + } else { + result.append(", "); + } + i++; + } + if (i < max) + result.delete(result.length() - 2, result.length()); + result.append("]"); + return result.toString(); + } + - } ++} http://git-wip-us.apache.org/repos/asf/accumulo/blob/0c3dc17e/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 8c15dc4,77496ce..a433763 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@@ -140,6 -128,7 +140,7 @@@ public class ShellServerIT extends Shar } public static class TestShell { - private static final Logger shellLog = Logger.getLogger(TestShell.class); ++ private static final Logger shellLog = LoggerFactory.getLogger(TestShell.class); public TestOutputStream output; public StringInputStream input; public Shell shell; @@@ -213,7 -191,7 +214,7 @@@ } void assertGoodExit(String s, boolean stringPresent, ErrorMessageCallback callback) { - Shell.log.info(output.get()); - shellLog.debug("Shell Output: '" + output.get() + "'"); ++ shellLog.debug("Shell Output: '{}'", output.get()); if (0 != shell.getExitCode()) { String errorMsg = callback.getErrorMessage(); assertEquals(errorMsg, 0, shell.getExitCode());