This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ab996d0d26df4b2ed2c1874289d5b785ecbe2686 Merge: a0012351c5 41d9e3a483 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Sep 5 20:38:47 2024 +0000 Merge branch '3.1' .../org/apache/accumulo/core/conf/Property.java | 15 +- .../apache/accumulo/core/util/cache/Caches.java | 3 +- .../tserver/TabletServerResourceManager.java | 26 ++- .../tserver/memory/LargestFirstMemoryManager.java | 16 +- .../tserver/memory/TabletMemoryReport.java | 11 +- .../memory/LargestFirstMemoryManagerTest.java | 43 ++++- .../accumulo/test/ScanServerMaxLatencyIT.java | 208 +++++++++++++++++++++ 7 files changed, 311 insertions(+), 11 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java index 5767f8b508,0000000000..67dc8f5bb5 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java +++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java @@@ -1,109 -1,0 +1,110 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.cache; + +import static com.google.common.base.Suppliers.memoize; + +import java.util.function.Supplier; + +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Caffeine; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; + +public class Caches implements MetricsProducer { + + public enum CacheName { + BULK_IMPORT_FILE_LENGTHS, + CLASSLOADERS, + COMBINER_LOGGED_MSGS, + COMPACTIONS_COMPLETED, + COMPACTION_CONFIGS, + COMPACTOR_COUNTS, + COMPACTION_DIR_CACHE, + COMPACTION_DISPATCHERS, + COMPACTION_SERVICE_ID, + COMPACTOR_GROUP_ID, + COMPRESSION_ALGORITHM, + CRYPT_PASSWORDS, + HOST_REGEX_BALANCER_TABLE_REGEX, + INSTANCE_ID, + NAMESPACE_ID, + NAMESPACE_CONFIGS, + PROP_CACHE, + RECOVERY_MANAGER_PATH_CACHE, + SCAN_SERVER_TABLET_METADATA, + SERVICE_ENVIRONMENT_TABLE_CONFIGS, + SPACE_AWARE_VOLUME_CHOICE, + SPLITTER_FILES, + SPLITTER_STARTING, + SPLITTER_UNSPLITTABLE, + TABLE_CONFIGS, + TABLE_ID, + TABLE_PARENT_CONFIGS, + TABLE_ZOO_HELPER_CACHE, + TSRM_FILE_LENGTHS, + TINYLFU_BLOCK_CACHE, - VOLUME_HDFS_CONFIGS ++ VOLUME_HDFS_CONFIGS, ++ MINC_AGE + } + + private static final Logger LOG = LoggerFactory.getLogger(Caches.class); + private static final Supplier<Caches> CACHES = memoize(() -> new Caches()); + + public static Caches getInstance() { + return CACHES.get(); + } + + private MeterRegistry registry = null; + + private Caches() {} + + @Override + public void registerMetrics(MeterRegistry registry) { + this.registry = registry; + } + + private boolean setupMicrometerMetrics(Caffeine<Object,Object> cacheBuilder, String name) { + if (registry != null) { + try { + cacheBuilder.recordStats(() -> new CaffeineStatsCounter(registry, name)); + LOG.trace("Metrics enabled for {} cache.", name); + return true; + } catch (IllegalStateException e) { + // recordStats was already called by the cacheBuilder. + } + } + return false; + } + + public Caffeine<Object,Object> createNewBuilder(CacheName name, boolean emitMetricsIfEnabled) { + Caffeine<Object,Object> cacheBuilder = Caffeine.newBuilder(); + boolean metricsConfigured = false; + if (emitMetricsIfEnabled) { + metricsConfigured = setupMicrometerMetrics(cacheBuilder, name.name()); + } + LOG.trace("Caffeine builder created for {}, metrics enabled: {}", name, metricsConfigured); + return cacheBuilder; + } + +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 2644484a63,689b52718e..fe8f33da5e --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@@ -76,7 -79,7 +77,8 @@@ import org.apache.accumulo.core.spi.sca import org.apache.accumulo.core.spi.scan.ScanPrioritizer; import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; import org.apache.accumulo.core.trace.TraceUtil; + import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java index 40a85110fe,d9fa972b92..12a0dc98f9 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/memory/LargestFirstMemoryManager.java @@@ -29,10 -30,13 +30,13 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.state.tables.TableState; ++import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; + /** * The LargestFirstMemoryManager attempts to keep memory between 80% and 90% full. It adapts over * time the point at which it should start a compaction based on how full memory gets between @@@ -56,6 -60,8 +60,9 @@@ public class LargestFirstMemoryManager private double compactionThreshold; private long maxObserved; private final HashMap<TableId,Long> mincIdleThresholds = new HashMap<>(); + private final Cache<TableId,Long> mincAgeThresholds = - Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); ++ Caches.getInstance().createNewBuilder(Caches.CacheName.MINC_AGE, false) ++ .expireAfterWrite(5, TimeUnit.MINUTES).build(); private ServerContext context = null; private static class TabletInfo { diff --cc test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java index 0000000000,3e4ad77b17..21737bdede mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java @@@ -1,0 -1,203 +1,208 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL; + import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.security.SecureRandom; + import java.util.ArrayList; + import java.util.List; + import java.util.Map; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; + + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.BatchWriter; + import org.apache.accumulo.core.client.MutationsRejectedException; + import org.apache.accumulo.core.client.ScannerBase; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; ++import org.apache.accumulo.core.client.admin.TabletAvailability; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.util.Timer; -import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.test.functional.ConfigurableMacBase; + import org.apache.accumulo.test.util.Wait; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; + import org.junit.jupiter.api.Test; + + public class ScanServerMaxLatencyIT extends ConfigurableMacBase { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "2s"); + } + + @Test + public void testMaxLatency() throws Exception { + final String[] tables = this.getUniqueNames(4); + final String table1 = tables[0]; + final String table2 = tables[1]; + final String table3 = tables[2]; + final String table4 = tables[3]; + - getCluster().getConfig().setNumScanServers(1); - getCluster().getClusterControl().startAllServers(ServerType.SCAN_SERVER); - + ExecutorService executor = Executors.newCachedThreadPool(); + try (var client = Accumulo.newClient().from(getClientProperties()).build()) { + + Wait.waitFor(() -> !client.instanceOperations().getScanServers().isEmpty()); + + var ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), "2s")); ++ ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + client.tableOperations().create(table1, ntc); - client.tableOperations().create(table2); ++ ntc = new NewTableConfiguration(); ++ ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); ++ client.tableOperations().create(table2, ntc); + ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_IDLETIME.getKey(), "2s")); ++ ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + client.tableOperations().create(table3, ntc); + ntc = new NewTableConfiguration(); + ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), "3s")); ++ ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + client.tableOperations().create(table4, ntc); + + Timer timer = Timer.startNew(); + + // Write to table4 once, this is different than the other tables that are continually being + // written to. table4 should minor compact 3 seconds after this write. + writeElapsed(new SecureRandom(), client, table4, timer); + boolean sawDataInTable4 = false; + + List<Future<Void>> futures = new ArrayList<>(); + futures.add(executor.submit(createWriterTask(client, table1, timer))); + futures.add(executor.submit(createWriterTask(client, table2, timer))); + futures.add(executor.submit(createWriterTask(client, table3, timer))); + + // wait for some data to be written + Wait.waitFor(() -> readMaxElapsed(client, IMMEDIATE, table1) > 0 + && readMaxElapsed(client, IMMEDIATE, table2) > 0 + && readMaxElapsed(client, IMMEDIATE, table3) > 0); + + long lastMaxSeen = -1; + int changes = 0; + + List<Long> deltas = new ArrayList<>(); + + while (changes < 4) { + Thread.sleep(250); + var currElapsed = timer.elapsed(TimeUnit.MILLISECONDS); + var maxElapsedInTable = readMaxElapsed(client, EVENTUAL, table1); + + if (maxElapsedInTable > 0 && maxElapsedInTable != lastMaxSeen) { + log.info("new max elapsed seen {} {}", lastMaxSeen, maxElapsedInTable); + changes++; + lastMaxSeen = maxElapsedInTable; + } + + if (maxElapsedInTable > 0) { + // This is difference in elapsed time written to the table vs the most recent elapsed + // time. + deltas.add(currElapsed - maxElapsedInTable); + } + + // The other table does not have the setting to minor compact based on age, so should never + // see any data for it from the scan server. + assertEquals(-1, readMaxElapsed(client, EVENTUAL, table2)); + // The background thread is writing to this table every 100ms so it should not be considered + // idle and therefor should not minor compact. + assertEquals(-1, readMaxElapsed(client, EVENTUAL, table3)); + + if (!sawDataInTable4 && readMaxElapsed(client, EVENTUAL, table4) != -1) { - assertTrue(timer.elapsed(TimeUnit.MILLISECONDS) > 3000 - && timer.elapsed(TimeUnit.MILLISECONDS) < 6000); ++ ++ assertTrue( ++ timer.elapsed(TimeUnit.MILLISECONDS) > 3000 ++ && timer.elapsed(TimeUnit.MILLISECONDS) < 6000, ++ "elapsed:" + timer.elapsed(TimeUnit.MILLISECONDS)); + sawDataInTable4 = true; + } + } + + assertTrue(sawDataInTable4); + + var stats = deltas.stream().mapToLong(l -> l).summaryStatistics(); + log.info("Delta stats : {}", stats); + // Should usually see data within 4 seconds, but not always because the timings config are + // when things should start to happen and not when they are guaranteed to finish. Would expect + // the average to be less than 4 seconds and the max less than 8 seconds. These numbers may + // not hold if running test on a heavily loaded machine. + assertTrue(stats.getAverage() > 500 && stats.getAverage() < 4000); + assertTrue(stats.getMax() < 8000); + assertTrue(stats.getCount() > 9); + + // The write task should still be running unless they experienced an exception. + assertTrue(futures.stream().noneMatch(Future::isDone)); + + executor.shutdownNow(); + executor.awaitTermination(600, TimeUnit.SECONDS); + + assertEquals(-1, readMaxElapsed(client, EVENTUAL, table2)); + // Now that nothing is writing its expected that max read by an immediate scan will see any + // data an eventual scan would see. + assertTrue( + readMaxElapsed(client, IMMEDIATE, table1) >= readMaxElapsed(client, EVENTUAL, table1)); + } + } + + private long readMaxElapsed(AccumuloClient client, ScannerBase.ConsistencyLevel consistency, + String table) throws Exception { + try (var scanner = client.createScanner(table)) { + scanner.setConsistencyLevel(consistency); + scanner.fetchColumn(new Text("elapsed"), new Text("nanos")); + return scanner.stream().mapToLong(e -> Long.parseLong(e.getValue().toString(), 10)).max() + .orElse(-1); + } + } + + private static void writeElapsed(SecureRandom random, AccumuloClient client, String table, + Timer timer) throws Exception { + try (var writer = client.createBatchWriter(table)) { + writeElapsed(random, timer, writer); + } + } + + private static Callable<Void> createWriterTask(AccumuloClient client, String table, Timer timer) { + SecureRandom random = new SecureRandom(); + return () -> { + try (var writer = client.createBatchWriter(table)) { + while (true) { + writeElapsed(random, timer, writer); + writer.flush(); + Thread.sleep(100); + } + } + }; + } + + private static void writeElapsed(SecureRandom random, Timer timer, BatchWriter writer) + throws MutationsRejectedException { + var elapsed = timer.elapsed(TimeUnit.MILLISECONDS); + Mutation m = new Mutation(Long.toHexString(random.nextLong())); + m.put("elapsed", "nanos", "" + elapsed); + writer.addMutation(m); + } + }