This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 171fdcd7f539da7b4ee0c011b5d546d3af6bde96 Merge: 113e495dbf 12919d094b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Feb 9 14:29:37 2024 +0000 Merge branch '2.1' .../java/org/apache/accumulo/core/Constants.java | 2 - .../org/apache/accumulo/core/conf/Property.java | 4 +- .../MiniAccumuloClusterClasspathTest.java | 2 +- .../minicluster/MiniAccumuloClusterTest.java | 4 +- .../accumulo/tserver/BulkFailedCopyProcessor.java | 81 ---------------------- .../org/apache/accumulo/tserver/TabletServer.java | 20 +----- .../org/apache/accumulo/tserver/log/LogSorter.java | 25 ++++--- 7 files changed, 19 insertions(+), 119 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/Constants.java index 3c8da35361,00b1a2fd18..66ac9144a8 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@@ -88,8 -88,9 +88,6 @@@ public class Constants public static final String ZNEXT_FILE = "/next_file"; - // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed -- public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq"; -- public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations"; public static final String ZRECOVERY = "/recovery"; diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 56f856410d,1fa04490fb..9acce283cd --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -721,9 -811,10 +721,11 @@@ public enum Property + " that begin with 'table.file' can be used here. For example, to set the compression" + " of the sorted recovery files to snappy use 'tserver.wal.sort.file.compress.type=snappy'.", "2.1.0"), + @Deprecated(since = "2.1.3") TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT, "The number of threads for the distributed work queue. These threads are" - + " used for copying failed bulk import RFiles.", - + " used for copying failed bulk import RFiles. This property will be removed when bulk import V1 is removed.", ++ + " used for copying failed bulk import RFiles. Note that as of version 3.1.0 this property" ++ + " is not used and will be removed in a future release.", "1.4.2"), TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN, "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents" diff --cc minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java index 2ba1960701,d18ffdd4f0..2a5da748db --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterClasspathTest.java @@@ -78,7 -80,7 +78,7 @@@ public class MiniAccumuloClusterClasspa MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, ROOT_PASSWORD).setJDWPEnabled(true); config.setZooKeeperPort(0); HashMap<String,String> site = new HashMap<>(); - site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2"); - site.put(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "cx1", jarFile.toURI().toString()); ++ site.put(Property.TSERV_WAL_MAX_SIZE.getKey(), "1G"); config.setSiteConfig(site); accumulo = new MiniAccumuloCluster(config); accumulo.start(); diff --cc minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java index 91f5b884c5,913904ef1f..d4a48de83a --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java @@@ -81,7 -79,7 +81,7 @@@ public class MiniAccumuloClusterTest ex MiniAccumuloConfig config = new MiniAccumuloConfig(testDir, ROOT_PASSWORD).setJDWPEnabled(true); config.setZooKeeperPort(0); HashMap<String,String> site = new HashMap<>(); - site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2"); - site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m"); ++ site.put(Property.TSERV_WAL_SORT_BUFFER_SIZE.getKey(), "15%"); config.setSiteConfig(site); accumulo = new MiniAccumuloCluster(config); accumulo.start(); @@@ -198,7 -194,7 +198,7 @@@ // ensure what user passed in is what comes back assertEquals(0, accumulo.getConfig().getZooKeeperPort()); HashMap<String,String> site = new HashMap<>(); - site.put(Property.TSERV_WORKQ_THREADS.getKey(), "2"); - site.put(Property.TSERV_COMPACTION_WARN_TIME.getKey(), "5m"); ++ site.put(Property.TSERV_WAL_SORT_BUFFER_SIZE.getKey(), "15%"); assertEquals(site, accumulo.getConfig().getSiteConfig()); } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index ed0d410859,6870f45805..0000000000 deleted file mode 100644,100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ /dev/null @@@ -1,81 -1,82 +1,0 @@@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one -- * or more contributor license agreements. See the NOTICE file -- * distributed with this work for additional information -- * regarding copyright ownership. The ASF licenses this file -- * to you under the Apache License, Version 2.0 (the -- * "License"); you may not use this file except in compliance -- * with the License. You may obtain a copy of the License at -- * -- * https://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, -- * software distributed under the License is distributed on an -- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -- * KIND, either express or implied. See the License for the -- * specific language governing permissions and limitations -- * under the License. -- */ --package org.apache.accumulo.tserver; -- --import static java.nio.charset.StandardCharsets.UTF_8; -- --import java.io.IOException; -- --import org.apache.accumulo.server.ServerContext; --import org.apache.accumulo.server.fs.VolumeManager; --import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; --import org.apache.hadoop.fs.FileSystem; --import org.apache.hadoop.fs.FileUtil; --import org.apache.hadoop.fs.Path; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- --/** -- * Copy failed bulk imports. -- */ -// TODO: Remove when Property.TSERV_WORKQ_THREADS is removed --public class BulkFailedCopyProcessor implements Processor { -- -- private static final Logger log = LoggerFactory.getLogger(BulkFailedCopyProcessor.class); -- -- private ServerContext context; -- -- BulkFailedCopyProcessor(ServerContext context) { -- this.context = context; -- } -- -- @Override -- public Processor newProcessor() { -- return new BulkFailedCopyProcessor(context); -- } -- -- @Override -- public void process(String workID, byte[] data) { -- -- String[] paths = new String(data, UTF_8).split(","); -- -- Path orig = new Path(paths[0]); -- Path dest = new Path(paths[1]); -- Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp"); -- -- VolumeManager vm = context.getVolumeManager(); -- try { -- FileSystem origFs = vm.getFileSystemByPath(orig); -- FileSystem destFs = vm.getFileSystemByPath(dest); -- -- FileUtil.copy(origFs, orig, destFs, tmp, false, true, context.getHadoopConf()); -- destFs.rename(tmp, dest); -- log.debug("copied {} to {}", orig, dest); -- } catch (IOException ex) { -- try { -- FileSystem destFs = vm.getFileSystemByPath(dest); -- destFs.create(dest).close(); -- log.warn(" marked " + dest + " failed", ex); -- } catch (IOException e) { -- log.error("Unable to create failure flag file " + dest, e); -- } -- } -- -- } -- --} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index bde58d7ed2,e41b99db97..12227f2576 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -57,7 -57,7 +57,6 @@@ import java.util.concurrent.BlockingDeq import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledFuture; --import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@@ -105,11 -102,15 +104,10 @@@ import org.apache.accumulo.core.util.Ma import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Retry.RetryFactory; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; --import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.AbstractServer; -import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.CompactionWatcher; @@@ -127,9 -127,10 +125,8 @@@ import org.apache.accumulo.server.rpc.T import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher; -import org.apache.accumulo.server.util.FileSystemMonitor; import org.apache.accumulo.server.util.ServerBulkImportStatus; import org.apache.accumulo.server.util.time.RelativeTime; --import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; @@@ -226,9 -225,8 +223,7 @@@ public class TabletServer extends Abstr private ServiceLock tabletServerLock; private TServer server; - private volatile TServer replServer; - private DistributedWorkQueue bulkFailedCopyQ; - private String lockID; private volatile long lockSessionId = -1; @@@ -743,21 -790,23 +738,8 @@@ throw new RuntimeException(e); } - @SuppressWarnings("deprecation") -- ThreadPoolExecutor distWorkQThreadPool = ThreadPools.getServerThreadPools() -- .createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true); -- - bulkFailedCopyQ = - // TODO: Remove when Property.TSERV_WORKQ_THREADS is removed - DistributedWorkQueue bulkFailedCopyQ = -- new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, -- getConfiguration(), getContext()); -- try { -- bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(getContext()), -- distWorkQThreadPool); -- } catch (Exception e1) { -- throw new RuntimeException("Failed to start distributed work queue for copying ", e1); -- } -- try { - logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool); + logSorter.startWatchingForRecoveryLogs(); } catch (Exception ex) { log.error("Error setting watches for recoveries"); throw new RuntimeException(ex); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index acee0d3101,8884f398a8..33ce1989e4 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -292,11 -292,14 +290,12 @@@ public class LogSorter } } - public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) - throws KeeperException, InterruptedException { - this.threadPool = distWorkQThreadPool; + public void startWatchingForRecoveryLogs() throws KeeperException, InterruptedException { - @SuppressWarnings("deprecation") - int threadPoolSize = this.conf.getCount(this.conf - .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); ++ int threadPoolSize = this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT); + ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools() + .createFixedThreadPool(threadPoolSize, this.getClass().getName(), true); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, - context).startProcessing(new LogProcessor(), this.threadPool); + context).startProcessing(new LogProcessor(), threadPool); } public List<RecoveryStatus> getLogSorts() {