This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 64273e31c17b861bda4eac0d38049fb6a9090b3d Merge: 217e59f3b8 041e4c1612 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed May 28 16:30:55 2025 +0000 Merge branch '2.1' .../admin/compaction/CompressionConfigurer.java | 9 +++-- .../accumulo/core/lock/ServiceLockSupport.java | 22 +++++------- .../java/org/apache/accumulo/core/util/Halt.java | 41 +++++++++++++--------- .../org/apache/accumulo/server/AbstractServer.java | 2 +- .../accumulo/server/manager/LiveTServerSet.java | 5 +-- .../accumulo/server/mem/LowMemoryDetector.java | 2 +- .../apache/accumulo/server/rpc/ServerAddress.java | 2 +- .../org/apache/accumulo/server/util/Admin.java | 6 ++-- .../accumulo/tserver/TabletClientHandler.java | 11 +++--- .../org/apache/accumulo/tserver/TabletServer.java | 3 +- .../accumulo/tserver/log/TabletServerLogger.java | 14 ++++---- .../accumulo/tserver/tablet/MinorCompactor.java | 2 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- 13 files changed, 59 insertions(+), 62 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java index d4e0cd0b58,e4514e0808..7b8cac777c --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java @@@ -79,26 -53,24 +79,24 @@@ public class ServiceLockSupport @Override public void lostLock(LockLossReason reason) { if (shutdownComplete.get()) { - LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", - serviceName, reason); + LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", server, + reason); } else { - Halt.halt(server + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1); - Halt.halt(-1, serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!"); ++ Halt.halt(-1, server + " lock in zookeeper lost (reason = " + reason + "), exiting!"); } } @Override public void unableToMonitorLockNode(final Exception e) { - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor {} lock node", server, e)); - - Halt.halt(-1, "FATAL: No longer able to monitor " + serviceName + " lock node", e); ++ Halt.halt(-1, "FATAL: No longer able to monitor " + server + " lock node", e); } @Override public synchronized void acquiredLock() { - LOG.debug("Acquired {} lock", serviceName); + LOG.debug("Acquired {} lock", server); if (acquiredLock || failedToAcquireLock) { - Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + Halt.halt(-1, "Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock); } acquiredLock = true; @@@ -111,9 -83,9 +109,9 @@@ if (e instanceof NoAuthException) { String msg = - "Failed to acquire " + serviceName + " lock due to incorrect ZooKeeper authentication."; + "Failed to acquire " + server + " lock due to incorrect ZooKeeper authentication."; LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(msg, -1); + Halt.halt(-1, msg); } if (acquiredLock) { @@@ -167,19 -139,17 +165,17 @@@ @Override public void lostLock(final LockLossReason reason) { if (shutdownComplete.get()) { - LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", - serviceName, reason); + LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.", server, + reason); } else { - Halt.halt(1, () -> { - LOG.error("{} lost lock (reason = {}), exiting.", server, reason); - lostLockAction.accept(server); - }); - Halt.halt(1, serviceName + " lost lock (reason = " + reason + "), exiting.", - () -> lostLockAction.accept(serviceName)); ++ Halt.halt(1, server + " lost lock (reason = " + reason + "), exiting.", ++ () -> lostLockAction.accept(server)); } } @Override public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, exiting.", server, e)); - Halt.halt(1, "Lost ability to monitor " + serviceName + " lock, exiting.", e); ++ Halt.halt(1, "Lost ability to monitor " + server + " lock, exiting.", e); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index fda21912c3,1757e29580..8468af2812 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@@ -503,33 -453,20 +503,30 @@@ public class LiveTServerSet implements break; } } - if (zPath == null) { + if (resourceGroup.isEmpty() || address.isEmpty()) { return; } - current.remove(zPath); - currentInstances.remove(server); - - log.info("Removing zookeeper lock for {}", server); - String fullpath = context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + zPath; - try { - context.getZooReaderWriter().recursiveDelete(fullpath, SKIP); - } catch (Exception e) { - String msg = "error removing tablet server lock"; - Halt.halt(-1, msg, e); + current.remove(address.orElseThrow().toString()); + + ResourceGroupPredicate rgPredicate = resourceGroup.map(rg -> { + ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2); + return rgp; + }).orElse(rg -> true); + AddressSelector addrPredicate = + address.map(AddressSelector::exact).orElse(AddressSelector.all()); + Set<ServiceLockPath> paths = + context.getServerPaths().getTabletServer(rgPredicate, addrPredicate, false); + if (paths.isEmpty() || paths.size() > 1) { + log.error("Zero or many zookeeper entries match input arguments."); + } else { + ServiceLockPath slp = paths.iterator().next(); + log.info("Removing zookeeper lock for {}", slp); + try { + context.getZooSession().asReaderWriter().recursiveDelete(slp.toString(), SKIP); + } catch (Exception e) { - String msg = "error removing tablet server lock"; - // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility - log.error("FATAL: {}", msg, e); - Halt.halt(msg, -1); ++ Halt.halt(-1, "error removing tablet server lock", e); + } + context.getZooCache().clear(slp.toString()); } - getZooCache().clear(fullpath); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index c24eefe4ae,0000000000..8d745e7774 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@@ -1,206 -1,0 +1,206 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.mem; + +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.server.ServerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LowMemoryDetector { + + private static class LowMemDetectorState { + private long lastMemorySize = 0; + private int lowMemCount = 0; + private long lastMemoryCheckTime = 0; + private boolean runningLowOnMemory = false; + } + + private static final Logger LOG = LoggerFactory.getLogger(LowMemoryDetector.class); + + @FunctionalInterface + public interface Action { + void execute(); + } + + public enum DetectionScope { + MINC, MAJC, SCAN + } + + private final HashMap<String,Long> prevGcTime = new HashMap<>(); + private final Lock memCheckTimeLock = new ReentrantLock(); + private final AtomicReference<LowMemDetectorState> state = + new AtomicReference<>(new LowMemDetectorState()); + + public long getIntervalMillis(AccumuloConfiguration conf) { + return conf.getTimeInMillis(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL); + } + + public boolean isRunningLowOnMemory() { + return state.get().runningLowOnMemory; + } + + /** + * @param context server context + * @param scope whether this is being checked in the context of scan or compact code + * @param isUserTable boolean set true if the table being scanned / compacted is a user table. No + * action is taken for system tables. + * @param action Action to perform when this method returns true + * @return true if server running low on memory + */ + public boolean isRunningLowOnMemory(ServerContext context, DetectionScope scope, + Supplier<Boolean> isUserTable, Action action) { + if (isUserTable.get()) { + Property p; + switch (scope) { + case SCAN: + p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION; + break; + case MINC: + p = Property.GENERAL_LOW_MEM_MINC_PROTECTION; + break; + case MAJC: + p = Property.GENERAL_LOW_MEM_MAJC_PROTECTION; + break; + default: + throw new IllegalArgumentException("Unknown scope: " + scope); + } + boolean isEnabled = context.getConfiguration().getBoolean(p); + // Only incur the penalty of accessing the volatile variable when enabled for this scope + if (isEnabled && state.get().runningLowOnMemory) { + action.execute(); + return true; + } + } + return false; + } + + public synchronized void logGCInfo(AccumuloConfiguration conf) { + + double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD); + + memCheckTimeLock.lock(); + try { + LowMemDetectorState localState = state.get(); + final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + + List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); + + StringBuilder sb = new StringBuilder("gc"); + + boolean sawChange = false; + + long maxIncreaseInCollectionTime = 0; + for (GarbageCollectorMXBean gcBean : gcmBeans) { + Long prevTime = prevGcTime.get(gcBean.getName()); + long pt = 0; + if (prevTime != null) { + pt = prevTime; + } + + long time = gcBean.getCollectionTime(); + + if (time - pt != 0) { + sawChange = true; + } + + long increaseInCollectionTime = time - pt; + sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, + increaseInCollectionTime / 1000.0)); + maxIncreaseInCollectionTime = + Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime); + prevGcTime.put(gcBean.getName(), time); + } + + Runtime rt = Runtime.getRuntime(); + final long maxConfiguredMemory = rt.maxMemory(); + final long allocatedMemory = rt.totalMemory(); + final long allocatedFreeMemory = rt.freeMemory(); + final long freeMemory = maxConfiguredMemory - (allocatedMemory - allocatedFreeMemory); + final long lowMemoryThreshold = (long) (maxConfiguredMemory * freeMemoryPercentage); + LOG.trace("Memory info: max={}, allocated={}, free={}, free threshold={}", + maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold); + + if (freeMemory < lowMemoryThreshold) { + localState.lowMemCount++; + if (localState.lowMemCount > 3 && !localState.runningLowOnMemory) { + localState.runningLowOnMemory = true; + LOG.warn("Running low on memory: max={}, allocated={}, free={}, free threshold={}", + maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold); + } + } else { + // If we were running low on memory, but are not any longer, than log at warn + // so that it shows up in the logs + if (localState.runningLowOnMemory) { + LOG.warn("Recovered from low memory condition"); + } else { + LOG.trace("Not running low on memory"); + } + localState.runningLowOnMemory = false; + localState.lowMemCount = 0; + } + + if (freeMemory != localState.lastMemorySize) { + sawChange = true; + } + + sb.append(String.format(" freemem=%,d(%+,d) totalmem=%,d", freeMemory, + (freeMemory - localState.lastMemorySize), rt.totalMemory())); + + if (sawChange) { + LOG.debug(sb.toString()); + } + + final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + if (localState.lastMemoryCheckTime > 0 && localState.lastMemoryCheckTime < now) { + final long diff = now - localState.lastMemoryCheckTime; + if (diff > keepAliveTimeout + 1000) { + LOG.warn(String.format( + "GC pause checker not called in a timely" + + " fashion. Expected every %.1f seconds but was %.1f seconds since last check", + keepAliveTimeout / 1000., diff / 1000.)); + } + localState.lastMemoryCheckTime = now; + return; + } + + if (maxIncreaseInCollectionTime > keepAliveTimeout) { - Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); ++ Halt.halt(-1, "Garbage collection may be interfering with lock keep-alive. Halting."); + } + + localState.lastMemorySize = freeMemory; + localState.lastMemoryCheckTime = now; + } finally { + memCheckTimeLock.unlock(); + } + } + +} diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java index 3924bea115,2b06387915..1e93593bbc --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java @@@ -45,22 -40,4 +45,22 @@@ public class ServerAddress public HostAndPort getAddress() { return address; } + + public void startThriftServer(String threadName) { + Threads.createThread(threadName, () -> { + try { + server.serve(); + } catch (Error e) { - Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1); ++ Halt.halt(1, "Unexpected error in TThreadPoolServer " + e + ", halting.", e); + } + }).start(); + + while (!server.isServing()) { + // Wait for the thread to start and for the TServer to start + // serving events + UtilWaitThread.sleep(10); + Preconditions.checkState(!server.getShouldStop()); + } + + } } diff --cc server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 26027918b0,e3fb627dba..b01ffcb82f --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@@ -364,40 -265,6 +364,40 @@@ public class Admin implements KeywordEx @Parameter(names = {"-s", "--state"}, description = "<state>... Print transactions in the state(s) {NEW, IN_PROGRESS, FAILED_IN_PROGRESS, FAILED, SUCCESSFUL}") List<String> states = new ArrayList<>(); + + @Parameter(names = {"-t", "--type"}, + description = "<type>... Print transactions of fate instance type(s) {USER, META}") + List<String> instanceTypes = new ArrayList<>(); + } + + class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher { + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + String msg = "Admin lost lock: " + reason.toString(); + if (reason == ServiceLock.LockLossReason.LOCK_DELETED) { - Halt.halt(msg, 0); ++ Halt.halt(0, msg); + } else { - Halt.halt(msg, 1); ++ Halt.halt(1, msg); + } + } + + @Override + public void unableToMonitorLockNode(Exception e) { + String msg = "Admin unable to monitor lock: " + e.getMessage(); + log.warn(msg); - Halt.halt(msg, 1); ++ Halt.halt(1, msg); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for Admin"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for Admin, msg: " + e.getMessage()); + } } @Parameters(commandDescription = "show service status") diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 06ee31cf65,deb5b8df2c..45ea70fc1e --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@@ -962,11 -1155,9 +962,9 @@@ public class TabletClientHandler implem log.trace("Got {} message from user: {}", request, credentials.getPrincipal()); if (server.getLock() != null && server.getLock().wasLockAcquired() && !server.getLock().isLocked()) { - Halt.halt(1, () -> { - log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting", - request); - context.getLowMemoryDetector().logGCInfo(server.getConfiguration()); - }); + Halt.halt(1, + "Tablet server no longer holds lock during checkPermission() : " + request + ", exiting", - () -> server.getGcLogger().logGCInfo(server.getConfiguration())); ++ () -> context.getLowMemoryDetector().logGCInfo(server.getConfiguration())); } if (lock != null) { @@@ -1142,9 -1342,8 +1140,8 @@@ checkPermission(context, server, credentials, lock, "halt"); - Halt.halt(0, () -> { - log.info("Manager requested tablet server halt"); + Halt.halt(0, "Manager requested tablet server halt", () -> { - server.gcLogger.logGCInfo(server.getConfiguration()); + context.getLowMemoryDetector().logGCInfo(server.getConfiguration()); server.requestStop(); try { server.getLock().unlock();