This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ab996d0d26df4b2ed2c1874289d5b785ecbe2686
Merge: a0012351c5 41d9e3a483
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Sep 5 20:38:47 2024 +0000

    Merge branch '3.1'

 .../org/apache/accumulo/core/conf/Property.java    |  15 +-
 .../apache/accumulo/core/util/cache/Caches.java    |   3 +-
 .../tserver/TabletServerResourceManager.java       |  26 ++-
 .../tserver/memory/LargestFirstMemoryManager.java  |  16 +-
 .../tserver/memory/TabletMemoryReport.java         |  11 +-
 .../memory/LargestFirstMemoryManagerTest.java      |  43 ++++-
 .../accumulo/test/ScanServerMaxLatencyIT.java      | 208 +++++++++++++++++++++
 7 files changed, 311 insertions(+), 11 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
index 5767f8b508,0000000000..67dc8f5bb5
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
@@@ -1,109 -1,0 +1,110 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.util.cache;
 +
 +import static com.google.common.base.Suppliers.memoize;
 +
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Caffeine;
 +
 +import io.micrometer.core.instrument.MeterRegistry;
 +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter;
 +
 +public class Caches implements MetricsProducer {
 +
 +  public enum CacheName {
 +    BULK_IMPORT_FILE_LENGTHS,
 +    CLASSLOADERS,
 +    COMBINER_LOGGED_MSGS,
 +    COMPACTIONS_COMPLETED,
 +    COMPACTION_CONFIGS,
 +    COMPACTOR_COUNTS,
 +    COMPACTION_DIR_CACHE,
 +    COMPACTION_DISPATCHERS,
 +    COMPACTION_SERVICE_ID,
 +    COMPACTOR_GROUP_ID,
 +    COMPRESSION_ALGORITHM,
 +    CRYPT_PASSWORDS,
 +    HOST_REGEX_BALANCER_TABLE_REGEX,
 +    INSTANCE_ID,
 +    NAMESPACE_ID,
 +    NAMESPACE_CONFIGS,
 +    PROP_CACHE,
 +    RECOVERY_MANAGER_PATH_CACHE,
 +    SCAN_SERVER_TABLET_METADATA,
 +    SERVICE_ENVIRONMENT_TABLE_CONFIGS,
 +    SPACE_AWARE_VOLUME_CHOICE,
 +    SPLITTER_FILES,
 +    SPLITTER_STARTING,
 +    SPLITTER_UNSPLITTABLE,
 +    TABLE_CONFIGS,
 +    TABLE_ID,
 +    TABLE_PARENT_CONFIGS,
 +    TABLE_ZOO_HELPER_CACHE,
 +    TSRM_FILE_LENGTHS,
 +    TINYLFU_BLOCK_CACHE,
-     VOLUME_HDFS_CONFIGS
++    VOLUME_HDFS_CONFIGS,
++    MINC_AGE
 +  }
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(Caches.class);
 +  private static final Supplier<Caches> CACHES = memoize(() -> new Caches());
 +
 +  public static Caches getInstance() {
 +    return CACHES.get();
 +  }
 +
 +  private MeterRegistry registry = null;
 +
 +  private Caches() {}
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    this.registry = registry;
 +  }
 +
 +  private boolean setupMicrometerMetrics(Caffeine<Object,Object> 
cacheBuilder, String name) {
 +    if (registry != null) {
 +      try {
 +        cacheBuilder.recordStats(() -> new CaffeineStatsCounter(registry, 
name));
 +        LOG.trace("Metrics enabled for {} cache.", name);
 +        return true;
 +      } catch (IllegalStateException e) {
 +        // recordStats was already called by the cacheBuilder.
 +      }
 +    }
 +    return false;
 +  }
 +
 +  public Caffeine<Object,Object> createNewBuilder(CacheName name, boolean 
emitMetricsIfEnabled) {
 +    Caffeine<Object,Object> cacheBuilder = Caffeine.newBuilder();
 +    boolean metricsConfigured = false;
 +    if (emitMetricsIfEnabled) {
 +      metricsConfigured = setupMicrometerMetrics(cacheBuilder, name.name());
 +    }
 +    LOG.trace("Caffeine builder created for {}, metrics enabled: {}", name, 
metricsConfigured);
 +    return cacheBuilder;
 +  }
 +
 +}
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 2644484a63,689b52718e..fe8f33da5e
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -76,7 -79,7 +77,8 @@@ import org.apache.accumulo.core.spi.sca
  import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
  import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
  import org.apache.accumulo.core.trace.TraceUtil;
+ import org.apache.accumulo.core.util.Timer;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.server.ServerContext;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java
index 40a85110fe,d9fa972b92..12a0dc98f9
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java
@@@ -29,10 -30,13 +30,13 @@@ import org.apache.accumulo.core.conf.Pr
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.dataImpl.KeyExtent;
  import org.apache.accumulo.core.manager.state.tables.TableState;
++import org.apache.accumulo.core.util.cache.Caches;
  import org.apache.accumulo.server.ServerContext;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import com.github.benmanes.caffeine.cache.Cache;
 -import com.github.benmanes.caffeine.cache.Caffeine;
+ 
  /**
   * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% 
full. It adapts over
   * time the point at which it should start a compaction based on how full 
memory gets between
@@@ -56,6 -60,8 +60,9 @@@ public class LargestFirstMemoryManager 
    private double compactionThreshold;
    private long maxObserved;
    private final HashMap<TableId,Long> mincIdleThresholds = new HashMap<>();
+   private final Cache<TableId,Long> mincAgeThresholds =
 -      Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
++      Caches.getInstance().createNewBuilder(Caches.CacheName.MINC_AGE, false)
++          .expireAfterWrite(5, TimeUnit.MINUTES).build();
    private ServerContext context = null;
  
    private static class TabletInfo {
diff --cc 
test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
index 0000000000,3e4ad77b17..21737bdede
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java
@@@ -1,0 -1,203 +1,208 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test;
+ 
+ import static 
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
+ import static 
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.security.SecureRandom;
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.MutationsRejectedException;
+ import org.apache.accumulo.core.client.ScannerBase;
+ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
++import org.apache.accumulo.core.client.admin.TabletAvailability;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.util.Timer;
 -import org.apache.accumulo.minicluster.ServerType;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.test.functional.ConfigurableMacBase;
+ import org.apache.accumulo.test.util.Wait;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.Text;
+ import org.junit.jupiter.api.Test;
+ 
+ public class ScanServerMaxLatencyIT extends ConfigurableMacBase {
+ 
+   @Override
+   protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+     cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "2s");
+   }
+ 
+   @Test
+   public void testMaxLatency() throws Exception {
+     final String[] tables = this.getUniqueNames(4);
+     final String table1 = tables[0];
+     final String table2 = tables[1];
+     final String table3 = tables[2];
+     final String table4 = tables[3];
+ 
 -    getCluster().getConfig().setNumScanServers(1);
 -    getCluster().getClusterControl().startAllServers(ServerType.SCAN_SERVER);
 -
+     ExecutorService executor = Executors.newCachedThreadPool();
+     try (var client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+ 
+       Wait.waitFor(() -> 
!client.instanceOperations().getScanServers().isEmpty());
+ 
+       var ntc = new NewTableConfiguration();
+       ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), 
"2s"));
++      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
+       client.tableOperations().create(table1, ntc);
 -      client.tableOperations().create(table2);
++      ntc = new NewTableConfiguration();
++      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
++      client.tableOperations().create(table2, ntc);
+       ntc = new NewTableConfiguration();
+       ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_IDLETIME.getKey(), 
"2s"));
++      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
+       client.tableOperations().create(table3, ntc);
+       ntc = new NewTableConfiguration();
+       ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), 
"3s"));
++      ntc.withInitialTabletAvailability(TabletAvailability.HOSTED);
+       client.tableOperations().create(table4, ntc);
+ 
+       Timer timer = Timer.startNew();
+ 
+       // Write to table4 once, this is different than the other tables that 
are continually being
+       // written to. table4 should minor compact 3 seconds after this write.
+       writeElapsed(new SecureRandom(), client, table4, timer);
+       boolean sawDataInTable4 = false;
+ 
+       List<Future<Void>> futures = new ArrayList<>();
+       futures.add(executor.submit(createWriterTask(client, table1, timer)));
+       futures.add(executor.submit(createWriterTask(client, table2, timer)));
+       futures.add(executor.submit(createWriterTask(client, table3, timer)));
+ 
+       // wait for some data to be written
+       Wait.waitFor(() -> readMaxElapsed(client, IMMEDIATE, table1) > 0
+           && readMaxElapsed(client, IMMEDIATE, table2) > 0
+           && readMaxElapsed(client, IMMEDIATE, table3) > 0);
+ 
+       long lastMaxSeen = -1;
+       int changes = 0;
+ 
+       List<Long> deltas = new ArrayList<>();
+ 
+       while (changes < 4) {
+         Thread.sleep(250);
+         var currElapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+         var maxElapsedInTable = readMaxElapsed(client, EVENTUAL, table1);
+ 
+         if (maxElapsedInTable > 0 && maxElapsedInTable != lastMaxSeen) {
+           log.info("new max elapsed seen {} {}", lastMaxSeen, 
maxElapsedInTable);
+           changes++;
+           lastMaxSeen = maxElapsedInTable;
+         }
+ 
+         if (maxElapsedInTable > 0) {
+           // This is difference in elapsed time written to the table vs the 
most recent elapsed
+           // time.
+           deltas.add(currElapsed - maxElapsedInTable);
+         }
+ 
+         // The other table does not have the setting to minor compact based 
on age, so should never
+         // see any data for it from the scan server.
+         assertEquals(-1, readMaxElapsed(client, EVENTUAL, table2));
+         // The background thread is writing to this table every 100ms so it 
should not be considered
+         // idle and therefor should not minor compact.
+         assertEquals(-1, readMaxElapsed(client, EVENTUAL, table3));
+ 
+         if (!sawDataInTable4 && readMaxElapsed(client, EVENTUAL, table4) != 
-1) {
 -          assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) > 3000
 -              && timer.elapsed(TimeUnit.MILLISECONDS) < 6000);
++
++          assertTrue(
++              timer.elapsed(TimeUnit.MILLISECONDS) > 3000
++                  && timer.elapsed(TimeUnit.MILLISECONDS) < 6000,
++              "elapsed:" + timer.elapsed(TimeUnit.MILLISECONDS));
+           sawDataInTable4 = true;
+         }
+       }
+ 
+       assertTrue(sawDataInTable4);
+ 
+       var stats = deltas.stream().mapToLong(l -> l).summaryStatistics();
+       log.info("Delta stats : {}", stats);
+       // Should usually see data within 4 seconds, but not always because the 
timings config are
+       // when things should start to happen and not when they are guaranteed 
to finish. Would expect
+       // the average to be less than 4 seconds and the max less than 8 
seconds. These numbers may
+       // not hold if running test on a heavily loaded machine.
+       assertTrue(stats.getAverage() > 500 && stats.getAverage() < 4000);
+       assertTrue(stats.getMax() < 8000);
+       assertTrue(stats.getCount() > 9);
+ 
+       // The write task should still be running unless they experienced an 
exception.
+       assertTrue(futures.stream().noneMatch(Future::isDone));
+ 
+       executor.shutdownNow();
+       executor.awaitTermination(600, TimeUnit.SECONDS);
+ 
+       assertEquals(-1, readMaxElapsed(client, EVENTUAL, table2));
+       // Now that nothing is writing its expected that max read by an 
immediate scan will see any
+       // data an eventual scan would see.
+       assertTrue(
+           readMaxElapsed(client, IMMEDIATE, table1) >= readMaxElapsed(client, 
EVENTUAL, table1));
+     }
+   }
+ 
+   private long readMaxElapsed(AccumuloClient client, 
ScannerBase.ConsistencyLevel consistency,
+       String table) throws Exception {
+     try (var scanner = client.createScanner(table)) {
+       scanner.setConsistencyLevel(consistency);
+       scanner.fetchColumn(new Text("elapsed"), new Text("nanos"));
+       return scanner.stream().mapToLong(e -> 
Long.parseLong(e.getValue().toString(), 10)).max()
+           .orElse(-1);
+     }
+   }
+ 
+   private static void writeElapsed(SecureRandom random, AccumuloClient 
client, String table,
+       Timer timer) throws Exception {
+     try (var writer = client.createBatchWriter(table)) {
+       writeElapsed(random, timer, writer);
+     }
+   }
+ 
+   private static Callable<Void> createWriterTask(AccumuloClient client, 
String table, Timer timer) {
+     SecureRandom random = new SecureRandom();
+     return () -> {
+       try (var writer = client.createBatchWriter(table)) {
+         while (true) {
+           writeElapsed(random, timer, writer);
+           writer.flush();
+           Thread.sleep(100);
+         }
+       }
+     };
+   }
+ 
+   private static void writeElapsed(SecureRandom random, Timer timer, 
BatchWriter writer)
+       throws MutationsRejectedException {
+     var elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+     Mutation m = new Mutation(Long.toHexString(random.nextLong()));
+     m.put("elapsed", "nanos", "" + elapsed);
+     writer.addMutation(m);
+   }
+ }

Reply via email to