This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0ad96b1dc088070486ccc92a170545a6c91fab7f Merge: 8b0262d5b3 ec8ae122ed Author: Ed Coleman <edcole...@apache.org> AuthorDate: Tue Mar 19 19:02:30 2024 +0000 Merge remote-tracking branch 'upstream/2.1' .../accumulo/core/clientImpl/ClientContext.java | 9 +- .../core/clientImpl/ConditionalWriterImpl.java | 2 +- .../core/clientImpl/InstanceOperationsImpl.java | 4 +- .../core/clientImpl/TableOperationsImpl.java | 3 +- .../core/clientImpl/TabletServerBatchReader.java | 5 +- .../core/clientImpl/TabletServerBatchWriter.java | 8 +- .../accumulo/core/clientImpl/bulk/BulkImport.java | 8 +- .../accumulo/core/file/BloomFilterLayer.java | 5 +- .../file/blockfile/cache/lru/LruBlockCache.java | 4 +- .../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 2 +- .../util/compaction/ExternalCompactionUtil.java | 10 +- .../accumulo/core/util/threads/ThreadPools.java | 386 +++++++++++++-------- .../core/file/rfile/MultiThreadedRFileTest.java | 7 +- .../threads/ThreadPoolExecutorBuilderTest.java | 79 +++++ .../server/conf/ServerConfigurationFactory.java | 4 +- .../conf/store/impl/PropCacheCaffeineImpl.java | 7 +- .../server/conf/store/impl/PropStoreWatcher.java | 5 +- .../accumulo/server/fs/VolumeManagerImpl.java | 4 +- .../accumulo/server/problems/ProblemReports.java | 7 +- .../apache/accumulo/server/rpc/TServerUtils.java | 7 +- .../server/util/RemoveEntriesForMissingFiles.java | 4 +- .../server/util/VerifyTabletAssignments.java | 5 +- .../server/conf/store/impl/ReadyMonitorTest.java | 4 +- .../coordinator/CompactionCoordinator.java | 4 +- .../accumulo/coordinator/CompactionFinalizer.java | 11 +- .../main/java/org/apache/accumulo/gc/GCRun.java | 2 +- .../accumulo/manager/metrics/fate/FateMetrics.java | 4 +- .../accumulo/manager/recovery/RecoveryManager.java | 4 +- .../manager/upgrade/UpgradeCoordinator.java | 8 +- .../tserver/TabletServerResourceManager.java | 31 +- .../tserver/compactions/CompactionService.java | 6 +- .../compactions/InternalCompactionExecutor.java | 9 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 5 +- .../accumulo/tserver/log/TabletServerLogger.java | 4 +- .../metrics/CompactionExecutorsMetrics.java | 2 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 5 +- .../test/functional/BatchWriterFlushIT.java | 2 +- .../accumulo/test/functional/BulkFailureIT.java | 4 +- 38 files changed, 433 insertions(+), 247 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 12adfc5c98,147f19e5dd..334fb46a53 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@@ -243,46 -264,105 +261,82 @@@ public class ThreadPools * @return ExecutorService impl * @throws IllegalArgumentException if property is not handled */ - @SuppressWarnings("deprecation") public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf, final Property p, boolean emitThreadPoolMetrics) { - + ThreadPoolExecutorBuilder builder; switch (p) { - case GENERAL_SIMPLETIMER_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), "SimpleTimer"); case GENERAL_THREADPOOL_SIZE: return createScheduledExecutorService(conf.getCount(p), "GeneralExecutor", emitThreadPoolMetrics); - case MANAGER_BULK_THREADPOOL_SIZE: - builder = getPoolBuilder("bulk import").numCoreThreads(conf.getCount(p)).withTimeOut( - conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); - case MANAGER_RENAME_THREADS: - builder = getPoolBuilder("bulk move").numCoreThreads(conf.getCount(p)); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); case MANAGER_FATE_THREADPOOL_SIZE: - return createFixedThreadPool(conf.getCount(p), "Repo Runner", emitThreadPoolMetrics); + builder = getPoolBuilder("Repo Runner").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case MANAGER_STATUS_THREAD_POOL_SIZE: + builder = getPoolBuilder("GatherTableInformation"); int threads = conf.getCount(p); if (threads == 0) { - return createThreadPool(0, Integer.MAX_VALUE, 60L, SECONDS, "GatherTableInformation", - new SynchronousQueue<>(), emitThreadPoolMetrics); + builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) + .withQueue(new SynchronousQueue<>()); } else { - return createFixedThreadPool(threads, "GatherTableInformation", emitThreadPoolMetrics); + builder.numCoreThreads(threads); } + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_WORKQ_THREADS: - return createFixedThreadPool(conf.getCount(p), "distributed work queue", - emitThreadPoolMetrics); + builder = getPoolBuilder("distributed work queue").numCoreThreads(conf.getCount(p)); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_MINC_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "minor compactor", - emitThreadPoolMetrics); + builder = getPoolBuilder("minor compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L, + MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_MIGRATE_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "tablet migration", - emitThreadPoolMetrics); + builder = getPoolBuilder("tablet migration").numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_ASSIGNMENT_MAXCONCURRENT: - return createFixedThreadPool(conf.getCount(p), 0L, MILLISECONDS, "tablet assignment", - emitThreadPoolMetrics); + builder = getPoolBuilder("tablet assignment").numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_RETRIEVAL_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, - "summary file retriever", emitThreadPoolMetrics); + builder = getPoolBuilder("summary file retriever").numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_REMOTE_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, "summary remote", - emitThreadPoolMetrics); + builder = getPoolBuilder("summary remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L, + MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case TSERV_SUMMARY_PARTITION_THREADS: - return createThreadPool(conf.getCount(p), conf.getCount(p), 60, SECONDS, - "summary partition", emitThreadPoolMetrics); + builder = getPoolBuilder("summary partition").numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case GC_DELETE_THREADS: - return createFixedThreadPool(conf.getCount(p), "deleting", emitThreadPoolMetrics); + return getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build(); - case REPLICATION_WORKER_THREADS: - builder = getPoolBuilder("replication task").numCoreThreads(conf.getCount(p)); - if (emitThreadPoolMetrics) { - builder.enableThreadPoolMetrics(); - } - return builder.build(); - default: throw new IllegalArgumentException("Unhandled thread pool property: " + p); } @@@ -460,10 -550,25 +524,23 @@@ */ public ScheduledThreadPoolExecutor createGeneralScheduledExecutorService(AccumuloConfiguration conf) { - @SuppressWarnings("deprecation") - Property oldProp = Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE; - Property prop = conf.resolve(Property.GENERAL_THREADPOOL_SIZE, oldProp); - return (ScheduledThreadPoolExecutor) createExecutorService(conf, prop, true); + return (ScheduledThreadPoolExecutor) createExecutorService(conf, + Property.GENERAL_THREADPOOL_SIZE, true); } + /** + * Create a named ScheduledThreadPool. The pool will not be instrumented without additional + * metrics. This method should be preferred, especially for short-lived pools. + * + * @param numThreads number of threads + * @param name thread pool name + * @return ScheduledThreadPoolExecutor + */ + public ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads, + final String name) { + return createScheduledExecutorService(numThreads, name, false); + } + /** * Create a named ScheduledThreadPool * diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 8c66bb6ead,ea0c4ceabe..2f1525dc11 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@@ -19,8 -19,7 +19,8 @@@ package org.apache.accumulo.core.file.rfile; import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.SECONDS; + import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --cc server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java index 032165a0ce,2c04fc9e49..2708d21648 --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java @@@ -18,8 -18,8 +18,10 @@@ */ package org.apache.accumulo.server.problems; + import static java.util.concurrent.TimeUnit.SECONDS; + +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.EnumMap; import java.util.Iterator; diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 685080c5b1,75690df777..7358ce4416 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -144,8 -141,6 +144,8 @@@ public class CompactionCoordinator exte super("compaction-coordinator", opts, args); aconf = conf == null ? super.getConfiguration() : conf; schedExecutor = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf); - summariesExecutor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, - "Compaction Summary Gatherer", false); ++ summariesExecutor = ThreadPools.getServerThreadPools() ++ .getPoolBuilder("Compaction Summary Gatherer").numCoreThreads(10).build(); compactionFinalizer = createCompactionFinalizer(schedExecutor); tserverSet = createLiveTServerSet(); setupSecurity(); diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index 1939d6e51a,cd1753b67f..4cb6f3b94c --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@@ -283,9 -292,9 +283,9 @@@ public class GCRun implements GarbageCo minimizeDeletes(confirmedDeletes, processedDeletes, fs, log); ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() - .createExecutorService(config, Property.GC_DELETE_THREADS, false); + .createExecutorService(config, Property.GC_DELETE_THREADS); - final List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); + final Map<Path,Path> replacements = context.getVolumeReplacements(); for (final GcCandidate delete : confirmedDeletes.values()) { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index b33a3cfe7a,f405881cb6..cb590a3642 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@@ -70,11 -70,11 +70,11 @@@ public class RecoveryManager public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { this.manager = manager; existenceCache = - CacheBuilder.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS) + Caffeine.newBuilder().expireAfterWrite(timeToCacheExistsInMillis, TimeUnit.MILLISECONDS) .maximumWeight(10_000_000).weigher((path, exist) -> path.toString().length()).build(); - executor = ThreadPools.getServerThreadPools().createScheduledExecutorService(4, - "Walog sort starter", false); + executor = + ThreadPools.getServerThreadPools().createScheduledExecutorService(4, "Walog sort starter"); zooCache = new ZooCache(manager.getContext().getZooReader(), null); try { List<String> workIDs = diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 34b362fa2e,11cb713b58..84e69d561f --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@@ -18,8 -18,7 +18,9 @@@ */ package org.apache.accumulo.manager.upgrade; + import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3; +import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES; import java.io.IOException; import java.util.Collections; @@@ -198,16 -194,12 +198,16 @@@ public class UpgradeCoordinator "Not currently in a suitable state to do metadata upgrade %s", status); if (currentVersion < AccumuloDataVersion.get()) { - return ThreadPools.getServerThreadPools().createThreadPool(0, Integer.MAX_VALUE, 60L, - TimeUnit.SECONDS, "UpgradeMetadataThreads", new SynchronousQueue<>(), false) - .submit(() -> { + return ThreadPools.getServerThreadPools().getPoolBuilder("UpgradeMetadataThreads") + .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) + .withQueue(new SynchronousQueue<>()).build().submit(() -> { try { for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) { - log.info("Upgrading Root from data version {}", v); + log.info("Upgrading Root - current version {} as step towards target version {}", v, + AccumuloDataVersion.get()); + var upgrader = upgraders.get(v); + Objects.requireNonNull(upgrader, + "upgrade root: failed to find root upgrader for version " + currentVersion); upgraders.get(v).upgradeRoot(context); } setStatus(UpgradeStatus.UPGRADED_ROOT, eventCoordinator); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index b0a225295c,3d80f9c1ed..de47b841fd --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@@ -18,9 -18,11 +18,11 @@@ */ package org.apache.accumulo.tserver; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.Objects.requireNonNull; + import static java.util.concurrent.TimeUnit.MILLISECONDS; + import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; import java.util.ArrayList; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 33ce1989e4,6fc396e4f4..2b5a6c6135 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -291,9 -293,12 +291,10 @@@ public class LogSorter } public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { - @SuppressWarnings("deprecation") - int threadPoolSize = this.conf.getCount(this.conf - .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); + int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); - ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools() - .createFixedThreadPool(threadPoolSize, this.getClass().getName(), true); + ThreadPoolExecutor threadPool = + ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName()) + .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context).startProcessing(new LogProcessor(), threadPool); } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index dc66adc26a,346b70166c..54dad2b243 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -257,22 -262,60 +257,22 @@@ public class TabletServerLogger if (nextLogMaker != null) { return; } - nextLogMaker = - ThreadPools.getServerThreadPools().createFixedThreadPool(1, "WALog creator", true); + nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog creator") + .numCoreThreads(1).enableThreadPoolMetrics().build(); - nextLogMaker.execute(new Runnable() { - @Override - public void run() { - final ServerResources conf = tserver.getServerConfig(); - final VolumeManager fs = conf.getVolumeManager(); - while (!nextLogMaker.isShutdown()) { - log.debug("Creating next WAL"); - DfsLogger alog = null; - - try { - alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter); - alog.open(tserver.getClientAddressString()); - } catch (Exception t) { - log.error("Failed to open WAL", t); - // the log is not advertised in ZK yet, so we can just delete it if it exists - if (alog != null) { - try { - alog.close(); - } catch (Exception e) { - log.error("Failed to close WAL after it failed to open", e); - } - - try { - Path path = alog.getPath(); - if (fs.exists(path)) { - fs.delete(path); - } - } catch (Exception e) { - log.warn("Failed to delete a WAL that failed to open", e); - } - } - - try { - nextLog.offer(t, 12, TimeUnit.HOURS); - } catch (InterruptedException ex) { - // ignore - } - - continue; - } - - String fileName = alog.getFileName(); - log.debug("Created next WAL {}", fileName); - - try { - tserver.addNewLogMarker(alog); - } catch (Exception t) { - log.error("Failed to add new WAL marker for " + fileName, t); + nextLogMaker.execute(() -> { + final VolumeManager fs = tserver.getVolumeManager(); + while (!nextLogMaker.isShutdown()) { + log.debug("Creating next WAL"); + DfsLogger alog = null; + try { + alog = DfsLogger.createNew(tserver.getContext(), syncCounter, flushCounter, + tserver.getClientAddressString()); + } catch (Exception t) { + log.error("Failed to open WAL", t); + // the log is not advertised in ZK yet, so we can just delete it if it exists + if (alog != null) { try { - // Intentionally not deleting walog because it may have been advertised in ZK. See - // #949 alog.close(); } catch (Exception e) { log.error("Failed to close WAL after it failed to open", e); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java index b8cbbc19a1,e3c6bb5150..59c66e1d35 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java @@@ -72,8 -72,12 +72,8 @@@ public class CompactionExecutorsMetric } public CompactionExecutorsMetrics() { - startUpdateThread(); - } - - protected void startUpdateThread() { ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() - .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false); + .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller"); Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5); ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update, diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java index e721c34837,98f3e47f29..cb4b62d1d7 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java @@@ -107,100 -96,6 +107,100 @@@ public class BulkFailureIT extends Accu runTest(tables[1], 22222222L, BulkFailureIT::newLoad); } + private static Path createNewBulkDir(ServerContext context, VolumeManager fs, String sourceDir, + TableId tableId) throws IOException { + Path tableDir = fs.matchingFileSystem(new Path(sourceDir), context.getTablesDirs()); + if (tableDir == null) { + throw new IOException( + sourceDir + " is not in the same file system as any volume configured for Accumulo"); + } + + Path directory = new Path(tableDir, tableId.canonical()); + fs.mkdirs(directory); + + // only one should be able to create the lock file + // the purpose of the lock file is to avoid a race + // condition between the call to fs.exists() and + // fs.mkdirs()... if only hadoop had a mkdir() function + // that failed when the dir existed + + UniqueNameAllocator namer = context.getUniqueNameAllocator(); + + while (true) { + Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName()); + if (fs.exists(newBulkDir)) { // sanity check + throw new IOException("Dir exist when it should not " + newBulkDir); + } + if (fs.mkdirs(newBulkDir)) { + return newBulkDir; + } + + sleepUninterruptibly(3, TimeUnit.SECONDS); + } + } + + public static String prepareBulkImport(ServerContext manager, final VolumeManager fs, String dir, + TableId tableId, long tid) throws Exception { + final Path bulkDir = createNewBulkDir(manager, fs, dir, tableId); + + manager.getAmple().addBulkLoadInProgressFlag( + "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid); + + Path dirPath = new Path(dir); + FileStatus[] dataFiles = fs.listStatus(dirPath); + + final UniqueNameAllocator namer = manager.getUniqueNameAllocator(); + + AccumuloConfiguration serverConfig = manager.getConfiguration(); + int numThreads = serverConfig.getCount(Property.MANAGER_RENAME_THREADS); - ExecutorService workers = - ThreadPools.getServerThreadPools().createFixedThreadPool(numThreads, "bulk rename", false); ++ ExecutorService workers = ThreadPools.getServerThreadPools().getPoolBuilder("bulk rename") ++ .numCoreThreads(numThreads).build(); + List<Future<Exception>> results = new ArrayList<>(); + + for (FileStatus file : dataFiles) { + final FileStatus fileStatus = file; + results.add(workers.submit(() -> { + try { + String[] sa = fileStatus.getPath().getName().split("\\."); + String extension = ""; + if (sa.length > 1) { + extension = sa[sa.length - 1]; + + if (!FileOperations.getValidExtensions().contains(extension)) { + LOG.warn("{} does not have a valid extension, ignoring", fileStatus.getPath()); + return null; + } + } else { + LOG.warn("{} does not have any extension, ignoring", fileStatus.getPath()); + return null; + } + + String newName = "I" + namer.getNextName() + "." + extension; + Path newPath = new Path(bulkDir, newName); + try { + fs.rename(fileStatus.getPath(), newPath); + LOG.debug("Moved {} to {}", fileStatus.getPath(), newPath); + } catch (IOException E1) { + LOG.error("Could not move: {} {}", fileStatus.getPath(), E1.getMessage()); + } + + } catch (Exception ex) { + return ex; + } + return null; + })); + } + workers.shutdown(); + while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {} + + for (Future<Exception> ex : results) { + if (ex.get() != null) { + throw ex.get(); + } + } + return bulkDir.toString(); + } + /** * This test verifies two things. First it ensures that after a bulk imported file is compacted * that import request are ignored. Second it ensures that after the bulk import transaction is