This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 171fdcd7f539da7b4ee0c011b5d546d3af6bde96
Merge: 113e495dbf 12919d094b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Feb 9 14:29:37 2024 +0000

    Merge branch '2.1'

 .../java/org/apache/accumulo/core/Constants.java   |  2 -
 .../org/apache/accumulo/core/conf/Property.java    |  4 +-
 .../MiniAccumuloClusterClasspathTest.java          |  2 +-
 .../minicluster/MiniAccumuloClusterTest.java       |  4 +-
 .../accumulo/tserver/BulkFailedCopyProcessor.java  | 81 ----------------------
 .../org/apache/accumulo/tserver/TabletServer.java  | 20 +-----
 .../org/apache/accumulo/tserver/log/LogSorter.java | 25 ++++---
 7 files changed, 19 insertions(+), 119 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/Constants.java
index 3c8da35361,00b1a2fd18..66ac9144a8
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@@ -88,8 -88,9 +88,6 @@@ public class Constants 
  
    public static final String ZNEXT_FILE = "/next_file";
  
 -  // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
--  public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq";
--
    public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
    public static final String ZRECOVERY = "/recovery";
  
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 56f856410d,1fa04490fb..9acce283cd
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -721,9 -811,10 +721,11 @@@ public enum Property 
            + " that begin with 'table.file' can be used here. For example, to 
set the compression"
            + " of the sorted recovery files to snappy use 
'tserver.wal.sort.file.compress.type=snappy'.",
        "2.1.0"),
+   @Deprecated(since = "2.1.3")
    TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
        "The number of threads for the distributed work queue. These threads 
are"
-           + " used for copying failed bulk import RFiles.",
 -          + " used for copying failed bulk import RFiles. This property will 
be removed when bulk import V1 is removed.",
++          + " used for copying failed bulk import RFiles. Note that as of 
version 3.1.0 this property"
++          + " is not used and will be removed in a future release.",
        "1.4.2"),
    TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
        "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents"
diff --cc 
minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
index 2ba1960701,d18ffdd4f0..2a5da748db
--- 
a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
+++ 
b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java
@@@ -78,7 -80,7 +78,7 @@@ public class MiniAccumuloClusterClasspa
      MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, 
ROOT_PASSWORD).setJDWPEnabled(true);
      config.setZooKeeperPort(0);
      HashMap<String,String> site = new HashMap<>();
-     site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2");
 -    site.put(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "cx1", 
jarFile.toURI().toString());
++    site.put(Property.TSERV_WAL_MAX_SIZE.getKey(), "1G");
      config.setSiteConfig(site);
      accumulo = new MiniAccumuloCluster(config);
      accumulo.start();
diff --cc 
minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
index 91f5b884c5,913904ef1f..d4a48de83a
--- 
a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
+++ 
b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java
@@@ -81,7 -79,7 +81,7 @@@ public class MiniAccumuloClusterTest ex
      MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, 
ROOT_PASSWORD).setJDWPEnabled(true);
      config.setZooKeeperPort(0);
      HashMap<String,String> site = new HashMap<>();
-     site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2");
 -    site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m");
++    site.put(Property.TSERV_WAL_SORT_BUFFER_SIZE.getKey(), "15%");
      config.setSiteConfig(site);
      accumulo = new MiniAccumuloCluster(config);
      accumulo.start();
@@@ -198,7 -194,7 +198,7 @@@
      // ensure what user passed in is what comes back
      assertEquals(0, accumulo.getConfig().getZooKeeperPort());
      HashMap<String,String> site = new HashMap<>();
-     site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2");
 -    site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m");
++    site.put(Property.TSERV_WAL_SORT_BUFFER_SIZE.getKey(), "15%");
      assertEquals(site, accumulo.getConfig().getSiteConfig());
    }
  
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
index ed0d410859,6870f45805..0000000000
deleted file mode 100644,100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java
+++ /dev/null
@@@ -1,81 -1,82 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *   https://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--package org.apache.accumulo.tserver;
--
--import static java.nio.charset.StandardCharsets.UTF_8;
--
--import java.io.IOException;
--
--import org.apache.accumulo.server.ServerContext;
--import org.apache.accumulo.server.fs.VolumeManager;
--import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
--import org.apache.hadoop.fs.FileSystem;
--import org.apache.hadoop.fs.FileUtil;
--import org.apache.hadoop.fs.Path;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--/**
-- * Copy failed bulk imports.
-- */
 -// TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
--public class BulkFailedCopyProcessor implements Processor {
--
--  private static final Logger log = 
LoggerFactory.getLogger(BulkFailedCopyProcessor.class);
--
--  private ServerContext context;
--
--  BulkFailedCopyProcessor(ServerContext context) {
--    this.context = context;
--  }
--
--  @Override
--  public Processor newProcessor() {
--    return new BulkFailedCopyProcessor(context);
--  }
--
--  @Override
--  public void process(String workID, byte[] data) {
--
--    String[] paths = new String(data, UTF_8).split(",");
--
--    Path orig = new Path(paths[0]);
--    Path dest = new Path(paths[1]);
--    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
--
--    VolumeManager vm = context.getVolumeManager();
--    try {
--      FileSystem origFs = vm.getFileSystemByPath(orig);
--      FileSystem destFs = vm.getFileSystemByPath(dest);
--
--      FileUtil.copy(origFs, orig, destFs, tmp, false, true, 
context.getHadoopConf());
--      destFs.rename(tmp, dest);
--      log.debug("copied {} to {}", orig, dest);
--    } catch (IOException ex) {
--      try {
--        FileSystem destFs = vm.getFileSystemByPath(dest);
--        destFs.create(dest).close();
--        log.warn(" marked " + dest + " failed", ex);
--      } catch (IOException e) {
--        log.error("Unable to create failure flag file " + dest, e);
--      }
--    }
--
--  }
--
--}
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bde58d7ed2,e41b99db97..12227f2576
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -57,7 -57,7 +57,6 @@@ import java.util.concurrent.BlockingDeq
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.LinkedBlockingDeque;
  import java.util.concurrent.ScheduledFuture;
--import java.util.concurrent.ThreadPoolExecutor;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
@@@ -105,11 -102,15 +104,10 @@@ import org.apache.accumulo.core.util.Ma
  import org.apache.accumulo.core.util.Pair;
  import org.apache.accumulo.core.util.Retry;
  import org.apache.accumulo.core.util.Retry.RetryFactory;
 -import org.apache.accumulo.core.util.ServerServices;
 -import org.apache.accumulo.core.util.ServerServices.Service;
  import org.apache.accumulo.core.util.UtilWaitThread;
--import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.server.AbstractServer;
 -import org.apache.accumulo.server.GarbageCollectionLogger;
  import org.apache.accumulo.server.ServerContext;
 -import org.apache.accumulo.server.ServerOpts;
  import org.apache.accumulo.server.TabletLevel;
  import org.apache.accumulo.server.client.ClientServiceHandler;
  import org.apache.accumulo.server.compaction.CompactionWatcher;
@@@ -127,9 -127,10 +125,8 @@@ import org.apache.accumulo.server.rpc.T
  import org.apache.accumulo.server.security.SecurityOperation;
  import org.apache.accumulo.server.security.SecurityUtil;
  import 
org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher;
 -import org.apache.accumulo.server.util.FileSystemMonitor;
  import org.apache.accumulo.server.util.ServerBulkImportStatus;
  import org.apache.accumulo.server.util.time.RelativeTime;
--import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
  import org.apache.accumulo.server.zookeeper.TransactionWatcher;
  import 
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
  import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
@@@ -226,9 -225,8 +223,7 @@@ public class TabletServer extends Abstr
    private ServiceLock tabletServerLock;
  
    private TServer server;
 -  private volatile TServer replServer;
  
-   private DistributedWorkQueue bulkFailedCopyQ;
- 
    private String lockID;
    private volatile long lockSessionId = -1;
  
@@@ -743,21 -790,23 +738,8 @@@
        throw new RuntimeException(e);
      }
  
 -    @SuppressWarnings("deprecation")
--    ThreadPoolExecutor distWorkQThreadPool = 
ThreadPools.getServerThreadPools()
--        .createExecutorService(getConfiguration(), 
Property.TSERV_WORKQ_THREADS, true);
--
-     bulkFailedCopyQ =
 -    // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed
 -    DistributedWorkQueue bulkFailedCopyQ =
--        new DistributedWorkQueue(getContext().getZooKeeperRoot() + 
Constants.ZBULK_FAILED_COPYQ,
--            getConfiguration(), getContext());
--    try {
--      bulkFailedCopyQ.startProcessing(new 
BulkFailedCopyProcessor(getContext()),
--          distWorkQThreadPool);
--    } catch (Exception e1) {
--      throw new RuntimeException("Failed to start distributed work queue for 
copying ", e1);
--    }
--
      try {
-       logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
+       logSorter.startWatchingForRecoveryLogs();
      } catch (Exception ex) {
        log.error("Error setting watches for recoveries");
        throw new RuntimeException(ex);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index acee0d3101,8884f398a8..33ce1989e4
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -292,11 -292,14 +290,12 @@@ public class LogSorter 
      }
    }
  
-   public void startWatchingForRecoveryLogs(ThreadPoolExecutor 
distWorkQThreadPool)
-       throws KeeperException, InterruptedException {
-     this.threadPool = distWorkQThreadPool;
+   public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
 -    @SuppressWarnings("deprecation")
 -    int threadPoolSize = this.conf.getCount(this.conf
 -        .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, 
Property.TSERV_RECOVERY_MAX_CONCURRENT));
++    int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
+     ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
+         .createFixedThreadPool(threadPoolSize, this.getClass().getName(), 
true);
      new DistributedWorkQueue(context.getZooKeeperRoot() + 
Constants.ZRECOVERY, sortedLogConf,
-         context).startProcessing(new LogProcessor(), this.threadPool);
+         context).startProcessing(new LogProcessor(), threadPool);
    }
  
    public List<RecoveryStatus> getLogSorts() {

Reply via email to