This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 64273e31c17b861bda4eac0d38049fb6a9090b3d
Merge: 217e59f3b8 041e4c1612
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed May 28 16:30:55 2025 +0000

    Merge branch '2.1'

 .../admin/compaction/CompressionConfigurer.java    |  9 +++--
 .../accumulo/core/lock/ServiceLockSupport.java     | 22 +++++-------
 .../java/org/apache/accumulo/core/util/Halt.java   | 41 +++++++++++++---------
 .../org/apache/accumulo/server/AbstractServer.java |  2 +-
 .../accumulo/server/manager/LiveTServerSet.java    |  5 +--
 .../accumulo/server/mem/LowMemoryDetector.java     |  2 +-
 .../apache/accumulo/server/rpc/ServerAddress.java  |  2 +-
 .../org/apache/accumulo/server/util/Admin.java     |  6 ++--
 .../accumulo/tserver/TabletClientHandler.java      | 11 +++---
 .../org/apache/accumulo/tserver/TabletServer.java  |  3 +-
 .../accumulo/tserver/log/TabletServerLogger.java   | 14 ++++----
 .../accumulo/tserver/tablet/MinorCompactor.java    |  2 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  2 +-
 13 files changed, 59 insertions(+), 62 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
index d4e0cd0b58,e4514e0808..7b8cac777c
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
@@@ -79,26 -53,24 +79,24 @@@ public class ServiceLockSupport 
      @Override
      public void lostLock(LockLossReason reason) {
        if (shutdownComplete.get()) {
 -        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.",
 -            serviceName, reason);
 +        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.", server,
 +            reason);
        } else {
-         Halt.halt(server + " lock in zookeeper lost (reason = " + reason + 
"), exiting!", -1);
 -        Halt.halt(-1, serviceName + " lock in zookeeper lost (reason = " + 
reason + "), exiting!");
++        Halt.halt(-1, server + " lock in zookeeper lost (reason = " + reason 
+ "), exiting!");
        }
      }
  
      @Override
      public void unableToMonitorLockNode(final Exception e) {
-       // ACCUMULO-3651 Changed level to error and added FATAL to message for 
slf4j compatibility
-       Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor {} lock 
node", server, e));
- 
 -      Halt.halt(-1, "FATAL: No longer able to monitor " + serviceName + " 
lock node", e);
++      Halt.halt(-1, "FATAL: No longer able to monitor " + server + " lock 
node", e);
      }
  
      @Override
      public synchronized void acquiredLock() {
 -      LOG.debug("Acquired {} lock", serviceName);
 +      LOG.debug("Acquired {} lock", server);
  
        if (acquiredLock || failedToAcquireLock) {
-         Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + 
failedToAcquireLock, -1);
+         Halt.halt(-1, "Zoolock in unexpected state AL " + acquiredLock + " " 
+ failedToAcquireLock);
        }
  
        acquiredLock = true;
@@@ -111,9 -83,9 +109,9 @@@
  
        if (e instanceof NoAuthException) {
          String msg =
 -            "Failed to acquire " + serviceName + " lock due to incorrect 
ZooKeeper authentication.";
 +            "Failed to acquire " + server + " lock due to incorrect ZooKeeper 
authentication.";
          LOG.error("{} Ensure instance.secret is consistent across Accumulo 
configuration", msg, e);
-         Halt.halt(msg, -1);
+         Halt.halt(-1, msg);
        }
  
        if (acquiredLock) {
@@@ -167,19 -139,17 +165,17 @@@
      @Override
      public void lostLock(final LockLossReason reason) {
        if (shutdownComplete.get()) {
 -        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.",
 -            serviceName, reason);
 +        LOG.warn("{} lost lock (reason = {}), not halting because shutdown is 
complete.", server,
 +            reason);
        } else {
-         Halt.halt(1, () -> {
-           LOG.error("{} lost lock (reason = {}), exiting.", server, reason);
-           lostLockAction.accept(server);
-         });
 -        Halt.halt(1, serviceName + " lost lock (reason = " + reason + "), 
exiting.",
 -            () -> lostLockAction.accept(serviceName));
++        Halt.halt(1, server + " lost lock (reason = " + reason + "), 
exiting.",
++            () -> lostLockAction.accept(server));
        }
      }
  
      @Override
      public void unableToMonitorLockNode(final Exception e) {
-       Halt.halt(1, () -> LOG.error("Lost ability to monitor {} lock, 
exiting.", server, e));
 -      Halt.halt(1, "Lost ability to monitor " + serviceName + " lock, 
exiting.", e);
++      Halt.halt(1, "Lost ability to monitor " + server + " lock, exiting.", 
e);
      }
  
    }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index fda21912c3,1757e29580..8468af2812
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@@ -503,33 -453,20 +503,30 @@@ public class LiveTServerSet implements 
          break;
        }
      }
 -    if (zPath == null) {
 +    if (resourceGroup.isEmpty() || address.isEmpty()) {
        return;
      }
 -    current.remove(zPath);
 -    currentInstances.remove(server);
 -
 -    log.info("Removing zookeeper lock for {}", server);
 -    String fullpath = context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" 
+ zPath;
 -    try {
 -      context.getZooReaderWriter().recursiveDelete(fullpath, SKIP);
 -    } catch (Exception e) {
 -      String msg = "error removing tablet server lock";
 -      Halt.halt(-1, msg, e);
 +    current.remove(address.orElseThrow().toString());
 +
 +    ResourceGroupPredicate rgPredicate = resourceGroup.map(rg -> {
 +      ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2);
 +      return rgp;
 +    }).orElse(rg -> true);
 +    AddressSelector addrPredicate =
 +        address.map(AddressSelector::exact).orElse(AddressSelector.all());
 +    Set<ServiceLockPath> paths =
 +        context.getServerPaths().getTabletServer(rgPredicate, addrPredicate, 
false);
 +    if (paths.isEmpty() || paths.size() > 1) {
 +      log.error("Zero or many zookeeper entries match input arguments.");
 +    } else {
 +      ServiceLockPath slp = paths.iterator().next();
 +      log.info("Removing zookeeper lock for {}", slp);
 +      try {
 +        
context.getZooSession().asReaderWriter().recursiveDelete(slp.toString(), SKIP);
 +      } catch (Exception e) {
-         String msg = "error removing tablet server lock";
-         // ACCUMULO-3651 Changed level to error and added FATAL to message 
for slf4j compatibility
-         log.error("FATAL: {}", msg, e);
-         Halt.halt(msg, -1);
++        Halt.halt(-1, "error removing tablet server lock", e);
 +      }
 +      context.getZooCache().clear(slp.toString());
      }
 -    getZooCache().clear(fullpath);
    }
  }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index c24eefe4ae,0000000000..8d745e7774
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@@ -1,206 -1,0 +1,206 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.mem;
 +
 +import java.lang.management.GarbageCollectorMXBean;
 +import java.lang.management.ManagementFactory;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.locks.Lock;
 +import java.util.concurrent.locks.ReentrantLock;
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.Halt;
 +import org.apache.accumulo.server.ServerContext;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class LowMemoryDetector {
 +
 +  private static class LowMemDetectorState {
 +    private long lastMemorySize = 0;
 +    private int lowMemCount = 0;
 +    private long lastMemoryCheckTime = 0;
 +    private boolean runningLowOnMemory = false;
 +  }
 +
 +  private static final Logger LOG = 
LoggerFactory.getLogger(LowMemoryDetector.class);
 +
 +  @FunctionalInterface
 +  public interface Action {
 +    void execute();
 +  }
 +
 +  public enum DetectionScope {
 +    MINC, MAJC, SCAN
 +  }
 +
 +  private final HashMap<String,Long> prevGcTime = new HashMap<>();
 +  private final Lock memCheckTimeLock = new ReentrantLock();
 +  private final AtomicReference<LowMemDetectorState> state =
 +      new AtomicReference<>(new LowMemDetectorState());
 +
 +  public long getIntervalMillis(AccumuloConfiguration conf) {
 +    return conf.getTimeInMillis(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL);
 +  }
 +
 +  public boolean isRunningLowOnMemory() {
 +    return state.get().runningLowOnMemory;
 +  }
 +
 +  /**
 +   * @param context server context
 +   * @param scope whether this is being checked in the context of scan or 
compact code
 +   * @param isUserTable boolean set true if the table being scanned / 
compacted is a user table. No
 +   *        action is taken for system tables.
 +   * @param action Action to perform when this method returns true
 +   * @return true if server running low on memory
 +   */
 +  public boolean isRunningLowOnMemory(ServerContext context, DetectionScope 
scope,
 +      Supplier<Boolean> isUserTable, Action action) {
 +    if (isUserTable.get()) {
 +      Property p;
 +      switch (scope) {
 +        case SCAN:
 +          p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION;
 +          break;
 +        case MINC:
 +          p = Property.GENERAL_LOW_MEM_MINC_PROTECTION;
 +          break;
 +        case MAJC:
 +          p = Property.GENERAL_LOW_MEM_MAJC_PROTECTION;
 +          break;
 +        default:
 +          throw new IllegalArgumentException("Unknown scope: " + scope);
 +      }
 +      boolean isEnabled = context.getConfiguration().getBoolean(p);
 +      // Only incur the penalty of accessing the volatile variable when 
enabled for this scope
 +      if (isEnabled && state.get().runningLowOnMemory) {
 +        action.execute();
 +        return true;
 +      }
 +    }
 +    return false;
 +  }
 +
 +  public synchronized void logGCInfo(AccumuloConfiguration conf) {
 +
 +    double freeMemoryPercentage = 
conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
 +
 +    memCheckTimeLock.lock();
 +    try {
 +      LowMemDetectorState localState = state.get();
 +      final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 +
 +      List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
 +
 +      StringBuilder sb = new StringBuilder("gc");
 +
 +      boolean sawChange = false;
 +
 +      long maxIncreaseInCollectionTime = 0;
 +      for (GarbageCollectorMXBean gcBean : gcmBeans) {
 +        Long prevTime = prevGcTime.get(gcBean.getName());
 +        long pt = 0;
 +        if (prevTime != null) {
 +          pt = prevTime;
 +        }
 +
 +        long time = gcBean.getCollectionTime();
 +
 +        if (time - pt != 0) {
 +          sawChange = true;
 +        }
 +
 +        long increaseInCollectionTime = time - pt;
 +        sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), 
time / 1000.0,
 +            increaseInCollectionTime / 1000.0));
 +        maxIncreaseInCollectionTime =
 +            Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
 +        prevGcTime.put(gcBean.getName(), time);
 +      }
 +
 +      Runtime rt = Runtime.getRuntime();
 +      final long maxConfiguredMemory = rt.maxMemory();
 +      final long allocatedMemory = rt.totalMemory();
 +      final long allocatedFreeMemory = rt.freeMemory();
 +      final long freeMemory = maxConfiguredMemory - (allocatedMemory - 
allocatedFreeMemory);
 +      final long lowMemoryThreshold = (long) (maxConfiguredMemory * 
freeMemoryPercentage);
 +      LOG.trace("Memory info: max={}, allocated={}, free={}, free 
threshold={}",
 +          maxConfiguredMemory, allocatedMemory, freeMemory, 
lowMemoryThreshold);
 +
 +      if (freeMemory < lowMemoryThreshold) {
 +        localState.lowMemCount++;
 +        if (localState.lowMemCount > 3 && !localState.runningLowOnMemory) {
 +          localState.runningLowOnMemory = true;
 +          LOG.warn("Running low on memory: max={}, allocated={}, free={}, 
free threshold={}",
 +              maxConfiguredMemory, allocatedMemory, freeMemory, 
lowMemoryThreshold);
 +        }
 +      } else {
 +        // If we were running low on memory, but are not any longer, than log 
at warn
 +        // so that it shows up in the logs
 +        if (localState.runningLowOnMemory) {
 +          LOG.warn("Recovered from low memory condition");
 +        } else {
 +          LOG.trace("Not running low on memory");
 +        }
 +        localState.runningLowOnMemory = false;
 +        localState.lowMemCount = 0;
 +      }
 +
 +      if (freeMemory != localState.lastMemorySize) {
 +        sawChange = true;
 +      }
 +
 +      sb.append(String.format(" freemem=%,d(%+,d) totalmem=%,d", freeMemory,
 +          (freeMemory - localState.lastMemorySize), rt.totalMemory()));
 +
 +      if (sawChange) {
 +        LOG.debug(sb.toString());
 +      }
 +
 +      final long keepAliveTimeout = 
conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 +      if (localState.lastMemoryCheckTime > 0 && 
localState.lastMemoryCheckTime < now) {
 +        final long diff = now - localState.lastMemoryCheckTime;
 +        if (diff > keepAliveTimeout + 1000) {
 +          LOG.warn(String.format(
 +              "GC pause checker not called in a timely"
 +                  + " fashion. Expected every %.1f seconds but was %.1f 
seconds since last check",
 +              keepAliveTimeout / 1000., diff / 1000.));
 +        }
 +        localState.lastMemoryCheckTime = now;
 +        return;
 +      }
 +
 +      if (maxIncreaseInCollectionTime > keepAliveTimeout) {
-         Halt.halt("Garbage collection may be interfering with lock 
keep-alive. Halting.", -1);
++        Halt.halt(-1, "Garbage collection may be interfering with lock 
keep-alive. Halting.");
 +      }
 +
 +      localState.lastMemorySize = freeMemory;
 +      localState.lastMemoryCheckTime = now;
 +    } finally {
 +      memCheckTimeLock.unlock();
 +    }
 +  }
 +
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
index 3924bea115,2b06387915..1e93593bbc
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
@@@ -45,22 -40,4 +45,22 @@@ public class ServerAddress 
    public HostAndPort getAddress() {
      return address;
    }
 +
 +  public void startThriftServer(String threadName) {
 +    Threads.createThread(threadName, () -> {
 +      try {
 +        server.serve();
 +      } catch (Error e) {
-         Halt.halt("Unexpected error in TThreadPoolServer " + e + ", 
halting.", 1);
++        Halt.halt(1, "Unexpected error in TThreadPoolServer " + e + ", 
halting.", e);
 +      }
 +    }).start();
 +
 +    while (!server.isServing()) {
 +      // Wait for the thread to start and for the TServer to start
 +      // serving events
 +      UtilWaitThread.sleep(10);
 +      Preconditions.checkState(!server.getShouldStop());
 +    }
 +
 +  }
  }
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index 26027918b0,e3fb627dba..b01ffcb82f
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@@ -364,40 -265,6 +364,40 @@@ public class Admin implements KeywordEx
      @Parameter(names = {"-s", "--state"},
          description = "<state>... Print transactions in the state(s) {NEW, 
IN_PROGRESS, FAILED_IN_PROGRESS, FAILED, SUCCESSFUL}")
      List<String> states = new ArrayList<>();
 +
 +    @Parameter(names = {"-t", "--type"},
 +        description = "<type>... Print transactions of fate instance type(s) 
{USER, META}")
 +    List<String> instanceTypes = new ArrayList<>();
 +  }
 +
 +  class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher {
 +    @Override
 +    public void lostLock(ServiceLock.LockLossReason reason) {
 +      String msg = "Admin lost lock: " + reason.toString();
 +      if (reason == ServiceLock.LockLossReason.LOCK_DELETED) {
-         Halt.halt(msg, 0);
++        Halt.halt(0, msg);
 +      } else {
-         Halt.halt(msg, 1);
++        Halt.halt(1, msg);
 +      }
 +    }
 +
 +    @Override
 +    public void unableToMonitorLockNode(Exception e) {
 +      String msg = "Admin unable to monitor lock: " + e.getMessage();
 +      log.warn(msg);
-       Halt.halt(msg, 1);
++      Halt.halt(1, msg);
 +    }
 +
 +    @Override
 +    public void acquiredLock() {
 +      lockAcquiredLatch.countDown();
 +      log.debug("Acquired ZooKeeper lock for Admin");
 +    }
 +
 +    @Override
 +    public void failedToAcquireLock(Exception e) {
 +      log.warn("Failed to acquire ZooKeeper lock for Admin, msg: " + 
e.getMessage());
 +    }
    }
  
    @Parameters(commandDescription = "show service status")
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 06ee31cf65,deb5b8df2c..45ea70fc1e
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@@ -962,11 -1155,9 +962,9 @@@ public class TabletClientHandler implem
      log.trace("Got {} message from user: {}", request, 
credentials.getPrincipal());
      if (server.getLock() != null && server.getLock().wasLockAcquired()
          && !server.getLock().isLocked()) {
-       Halt.halt(1, () -> {
-         log.info("Tablet server no longer holds lock during checkPermission() 
: {}, exiting",
-             request);
-         context.getLowMemoryDetector().logGCInfo(server.getConfiguration());
-       });
+       Halt.halt(1,
+           "Tablet server no longer holds lock during checkPermission() : " + 
request + ", exiting",
 -          () -> server.getGcLogger().logGCInfo(server.getConfiguration()));
++          () -> 
context.getLowMemoryDetector().logGCInfo(server.getConfiguration()));
      }
  
      if (lock != null) {
@@@ -1142,9 -1342,8 +1140,8 @@@
  
      checkPermission(context, server, credentials, lock, "halt");
  
-     Halt.halt(0, () -> {
-       log.info("Manager requested tablet server halt");
+     Halt.halt(0, "Manager requested tablet server halt", () -> {
 -      server.gcLogger.logGCInfo(server.getConfiguration());
 +      context.getLowMemoryDetector().logGCInfo(server.getConfiguration());
        server.requestStop();
        try {
          server.getLock().unlock();

Reply via email to