This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 12919d094b LogSorter was creating a ThreadPool and not using it (#4238) 12919d094b is described below commit 12919d094bc33e1714f8e0cdf4cc1498e5df5ba8 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Feb 9 08:16:12 2024 -0500 LogSorter was creating a ThreadPool and not using it (#4238) LogSorter was creating a ThreadPool based off of the property TSERV_WAL_SORT_MAX_CONCURRENT, but then the reference to the thread pool was being overwritten to a thread pool that is created to copy failed bulk import files. Fixes #4235 --- .../java/org/apache/accumulo/core/Constants.java | 1 + .../org/apache/accumulo/core/conf/Property.java | 3 ++- .../MiniAccumuloClusterClasspathTest.java | 1 - .../minicluster/MiniAccumuloClusterTest.java | 4 ++-- .../accumulo/tserver/BulkFailedCopyProcessor.java | 1 + .../org/apache/accumulo/tserver/TabletServer.java | 8 +++---- .../org/apache/accumulo/tserver/log/LogSorter.java | 28 +++++++++++----------- 7 files changed, 24 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 47f7c06ad1..00b1a2fd18 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -88,6 +88,7 @@ public class Constants { public static final String ZNEXT_FILE = "/next_file"; + // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq"; public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations"; diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f594984a77..1fa04490fb 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -811,9 +811,10 @@ public enum Property { + " that begin with 'table.file' can be used here. For example, to set the compression" + " of the sorted recovery files to snappy use 'tserver.wal.sort.file.compress.type=snappy'.", "2.1.0"), + @Deprecated(since = "2.1.3") TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT, "The number of threads for the distributed work queue. These threads are" - + " used for copying failed bulk import RFiles.", + + " used for copying failed bulk import RFiles. This property will be removed when bulk import V1 is removed.", "1.4.2"), TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN, "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents" diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java index c3c1af4a91..d18ffdd4f0 100644 --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java @@ -80,7 +80,6 @@ public class MiniAccumuloClusterClasspathTest extends WithTestNames { MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, ROOT_PASSWORD).setJDWPEnabled(true); config.setZooKeeperPort(0); HashMap<String,String> site = new HashMap<>(); - site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2"); site.put(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "cx1", jarFile.toURI().toString()); config.setSiteConfig(site); accumulo = new MiniAccumuloCluster(config); diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java index 61921fa238..913904ef1f 100644 --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java @@ -79,7 +79,7 @@ public class MiniAccumuloClusterTest extends WithTestNames { MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, ROOT_PASSWORD).setJDWPEnabled(true); config.setZooKeeperPort(0); HashMap<String,String> site = new HashMap<>(); - site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2"); + site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m"); config.setSiteConfig(site); accumulo = new MiniAccumuloCluster(config); accumulo.start(); @@ -194,7 +194,7 @@ public class MiniAccumuloClusterTest extends WithTestNames { // ensure what user passed in is what comes back assertEquals(0, accumulo.getConfig().getZooKeeperPort()); HashMap<String,String> site = new HashMap<>(); - site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2"); + site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m"); assertEquals(site, accumulo.getConfig().getSiteConfig()); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index ed0d410859..6870f45805 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; /** * Copy failed bulk imports. */ +// TODO: Remove when Property.TSERV_WORKQ_THREADS is removed public class BulkFailedCopyProcessor implements Processor { private static final Logger log = LoggerFactory.getLogger(BulkFailedCopyProcessor.class); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 9c7bad0557..e41b99db97 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -227,8 +227,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private TServer server; private volatile TServer replServer; - private DistributedWorkQueue bulkFailedCopyQ; - private String lockID; private volatile long lockSessionId = -1; @@ -792,10 +790,12 @@ public class TabletServer extends AbstractServer implements TabletHostingServer throw new RuntimeException(e); } + @SuppressWarnings("deprecation") ThreadPoolExecutor distWorkQThreadPool = ThreadPools.getServerThreadPools() .createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true); - bulkFailedCopyQ = + // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed + DistributedWorkQueue bulkFailedCopyQ = new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration(), getContext()); try { @@ -806,7 +806,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer } try { - logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool); + logSorter.startWatchingForRecoveryLogs(); } catch (Exception ex) { log.error("Error setting watches for recoveries"); throw new RuntimeException(ex); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 68083170e9..8884f398a8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -65,7 +65,6 @@ import com.google.common.annotations.VisibleForTesting; public class LogSorter { private static final Logger log = LoggerFactory.getLogger(LogSorter.class); - AccumuloConfiguration sortedLogConf; private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<>()); @@ -223,22 +222,20 @@ public class LogSorter { } } - ThreadPoolExecutor threadPool; private final ServerContext context; + private final AccumuloConfiguration conf; private final double walBlockSize; private final CryptoService cryptoService; + private final AccumuloConfiguration sortedLogConf; public LogSorter(ServerContext context, AccumuloConfiguration conf) { this.context = context; - this.sortedLogConf = extractSortedLogConfig(conf); - @SuppressWarnings("deprecation") - int threadPoolSize = conf.getCount(conf.resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, - Property.TSERV_RECOVERY_MAX_CONCURRENT)); - this.threadPool = ThreadPools.getServerThreadPools().createFixedThreadPool(threadPoolSize, - this.getClass().getName(), true); - this.walBlockSize = DfsLogger.getWalBlockSize(conf); + this.conf = conf; + this.sortedLogConf = extractSortedLogConfig(this.conf); + this.walBlockSize = DfsLogger.getWalBlockSize(this.conf); CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY); - this.cryptoService = context.getCryptoFactory().getService(env, conf.getAllCryptoProperties()); + this.cryptoService = + context.getCryptoFactory().getService(env, this.conf.getAllCryptoProperties()); } /** @@ -295,11 +292,14 @@ public class LogSorter { } } - public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) - throws KeeperException, InterruptedException { - this.threadPool = distWorkQThreadPool; + public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { + @SuppressWarnings("deprecation") + int threadPoolSize = this.conf.getCount(this.conf + .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); + ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools() + .createFixedThreadPool(threadPoolSize, this.getClass().getName(), true); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, - context).startProcessing(new LogProcessor(), this.threadPool); + context).startProcessing(new LogProcessor(), threadPool); } public List<RecoveryStatus> getLogSorts() {