This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 12919d094b LogSorter was creating a ThreadPool and not using it (#4238)
12919d094b is described below

commit 12919d094bc33e1714f8e0cdf4cc1498e5df5ba8
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Feb 9 08:16:12 2024 -0500

    LogSorter was creating a ThreadPool and not using it (#4238)
    
    LogSorter was creating a ThreadPool based off of the property
    TSERV_WAL_SORT_MAX_CONCURRENT, but then the reference to the
    thread pool was being overwritten to a thread pool that is
    created to copy failed bulk import files.
    
    Fixes #4235
---
 .../java/org/apache/accumulo/core/Constants.java   |  1 +
 .../org/apache/accumulo/core/conf/Property.java    |  3 ++-
 .../MiniAccumuloClusterClasspathTest.java          |  1 -
 .../minicluster/MiniAccumuloClusterTest.java       |  4 ++--
 .../accumulo/tserver/BulkFailedCopyProcessor.java  |  1 +
 .../org/apache/accumulo/tserver/TabletServer.java  |  8 +++----
 .../org/apache/accumulo/tserver/log/LogSorter.java | 28 +++++++++++-----------
 7 files changed, 24 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java 
b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 47f7c06ad1..00b1a2fd18 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -88,6 +88,7 @@ public class Constants {
 
   public static final String ZNEXT_FILE = "/next_file";
 
+  // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
   public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq";
 
   public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f594984a77..1fa04490fb 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -811,9 +811,10 @@ public enum Property {
           + " that begin with 'table.file' can be used here. For example, to 
set the compression"
           + " of the sorted recovery files to snappy use 
'tserver.wal.sort.file.compress.type=snappy'.",
       "2.1.0"),
+  @Deprecated(since = "2.1.3")
   TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
       "The number of threads for the distributed work queue. These threads are"
-          + " used for copying failed bulk import RFiles.",
+          + " used for copying failed bulk import RFiles. This property will 
be removed when bulk import V1 is removed.",
       "1.4.2"),
   TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
       "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents"
diff --git 
a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
 
b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
index c3c1af4a91..d18ffdd4f0 100644
--- 
a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
+++ 
b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
@@ -80,7 +80,6 @@ public class MiniAccumuloClusterClasspathTest extends 
WithTestNames {
     MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, 
ROOT_PASSWORD).setJDWPEnabled(true);
     config.setZooKeeperPort(0);
     HashMap<String,String> site = new HashMap<>();
-    site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2");
     site.put(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "cx1", 
jarFile.toURI().toString());
     config.setSiteConfig(site);
     accumulo = new MiniAccumuloCluster(config);
diff --git 
a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
 
b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
index 61921fa238..913904ef1f 100644
--- 
a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
+++ 
b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
@@ -79,7 +79,7 @@ public class MiniAccumuloClusterTest extends WithTestNames {
     MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, 
ROOT_PASSWORD).setJDWPEnabled(true);
     config.setZooKeeperPort(0);
     HashMap<String,String> site = new HashMap<>();
-    site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2");
+    site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m");
     config.setSiteConfig(site);
     accumulo = new MiniAccumuloCluster(config);
     accumulo.start();
@@ -194,7 +194,7 @@ public class MiniAccumuloClusterTest extends WithTestNames {
     // ensure what user passed in is what comes back
     assertEquals(0, accumulo.getConfig().getZooKeeperPort());
     HashMap<String,String> site = new HashMap<>();
-    site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2");
+    site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m");
     assertEquals(site, accumulo.getConfig().getSiteConfig());
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
index ed0d410859..6870f45805 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Copy failed bulk imports.
  */
+// TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
 public class BulkFailedCopyProcessor implements Processor {
 
   private static final Logger log = 
LoggerFactory.getLogger(BulkFailedCopyProcessor.class);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 9c7bad0557..e41b99db97 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -227,8 +227,6 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
   private TServer server;
   private volatile TServer replServer;
 
-  private DistributedWorkQueue bulkFailedCopyQ;
-
   private String lockID;
   private volatile long lockSessionId = -1;
 
@@ -792,10 +790,12 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
       throw new RuntimeException(e);
     }
 
+    @SuppressWarnings("deprecation")
     ThreadPoolExecutor distWorkQThreadPool = ThreadPools.getServerThreadPools()
         .createExecutorService(getConfiguration(), 
Property.TSERV_WORKQ_THREADS, true);
 
-    bulkFailedCopyQ =
+    // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
+    DistributedWorkQueue bulkFailedCopyQ =
         new DistributedWorkQueue(getContext().getZooKeeperRoot() + 
Constants.ZBULK_FAILED_COPYQ,
             getConfiguration(), getContext());
     try {
@@ -806,7 +806,7 @@ public class TabletServer extends AbstractServer implements 
TabletHostingServer
     }
 
     try {
-      logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
+      logSorter.startWatchingForRecoveryLogs();
     } catch (Exception ex) {
       log.error("Error setting watches for recoveries");
       throw new RuntimeException(ex);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 68083170e9..8884f398a8 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -65,7 +65,6 @@ import com.google.common.annotations.VisibleForTesting;
 public class LogSorter {
 
   private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
-  AccumuloConfiguration sortedLogConf;
 
   private final Map<String,LogProcessor> currentWork = 
Collections.synchronizedMap(new HashMap<>());
 
@@ -223,22 +222,20 @@ public class LogSorter {
     }
   }
 
-  ThreadPoolExecutor threadPool;
   private final ServerContext context;
+  private final AccumuloConfiguration conf;
   private final double walBlockSize;
   private final CryptoService cryptoService;
+  private final AccumuloConfiguration sortedLogConf;
 
   public LogSorter(ServerContext context, AccumuloConfiguration conf) {
     this.context = context;
-    this.sortedLogConf = extractSortedLogConfig(conf);
-    @SuppressWarnings("deprecation")
-    int threadPoolSize = 
conf.getCount(conf.resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT,
-        Property.TSERV_RECOVERY_MAX_CONCURRENT));
-    this.threadPool = 
ThreadPools.getServerThreadPools().createFixedThreadPool(threadPoolSize,
-        this.getClass().getName(), true);
-    this.walBlockSize = DfsLogger.getWalBlockSize(conf);
+    this.conf = conf;
+    this.sortedLogConf = extractSortedLogConfig(this.conf);
+    this.walBlockSize = DfsLogger.getWalBlockSize(this.conf);
     CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);
-    this.cryptoService = context.getCryptoFactory().getService(env, 
conf.getAllCryptoProperties());
+    this.cryptoService =
+        context.getCryptoFactory().getService(env, 
this.conf.getAllCryptoProperties());
   }
 
   /**
@@ -295,11 +292,14 @@ public class LogSorter {
     }
   }
 
-  public void startWatchingForRecoveryLogs(ThreadPoolExecutor 
distWorkQThreadPool)
-      throws KeeperException, InterruptedException {
-    this.threadPool = distWorkQThreadPool;
+  public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
+    @SuppressWarnings("deprecation")
+    int threadPoolSize = this.conf.getCount(this.conf
+        .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, 
Property.TSERV_RECOVERY_MAX_CONCURRENT));
+    ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
+        .createFixedThreadPool(threadPoolSize, this.getClass().getName(), 
true);
     new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, 
sortedLogConf,
-        context).startProcessing(new LogProcessor(), this.threadPool);
+        context).startProcessing(new LogProcessor(), threadPool);
   }
 
   public List<RecoveryStatus> getLogSorts() {

Reply via email to