This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 3ab133cef088f23c277d1c65d375a1b5aea1a560 Merge: 85418b8f5e 88c2f7bbb6 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Aug 27 22:27:24 2024 +0000 Merge branch '3.1' .../TabletServerBatchReaderIterator.java | 20 +- .../accumulo/core/metrics/MetricsProducer.java | 8 + .../spi/scan/ConfigurableScanServerSelector.java | 13 +- .../scan/ConfigurableScanServerSelectorTest.java | 34 +- .../org/apache/accumulo/tserver/ScanServer.java | 1 + .../org/apache/accumulo/tserver/TabletServer.java | 1 + .../accumulo/tserver/ThriftScanClientHandler.java | 10 +- .../tserver/metrics/TabletServerScanMetrics.java | 13 +- .../apache/accumulo/tserver/scan/LookupTask.java | 2 +- .../accumulo/tserver/scan/NextBatchTask.java | 2 +- .../accumulo/tserver/scan/ScanParameters.java | 10 + .../org/apache/accumulo/tserver/scan/ScanTask.java | 93 ++++- .../accumulo/tserver/session/ScanSession.java | 69 +++- .../apache/accumulo/tserver/session/Session.java | 14 +- .../accumulo/tserver/session/SessionManager.java | 67 +++- .../accumulo/tserver/tablet/ScanDataSource.java | 4 + .../org/apache/accumulo/tserver/tablet/Tablet.java | 39 ++- .../tserver/session/SessionManagerTest.java | 42 +++ .../test/ScanServerGroupConfigurationIT.java | 2 + .../accumulo/test/ScanServerIT_NoServers.java | 46 ++- .../org/apache/accumulo/test/ZombieScanIT.java | 387 +++++++++++++++++++++ .../test/functional/ScanSessionTimeOutIT.java | 19 +- .../apache/accumulo/test/functional/ScannerIT.java | 81 +++++ 23 files changed, 909 insertions(+), 68 deletions(-) diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index f22e8d60a2,964ddedb4b..b1454ec38e --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -405,9 -404,11 +405,10 @@@ public class ScanServer extends Abstrac } MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), clientAddress); - metricsInfo.addCommonTags(List.of(Tag.of("resource.group", groupName))); + metricsInfo.addServiceTags(getApplicationName(), clientAddress, getResourceGroup()); scanMetrics = new TabletServerScanMetrics(); + sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); scanServerMetrics = new ScanServerMetrics(tabletMetadataCache); blockCacheMetrics = new BlockCacheMetrics(resourceManager.getIndexCache(), resourceManager.getDataCache(), resourceManager.getSummaryCache()); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 89e166c97d,41c6409063..cb8f211fe2 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -583,7 -709,9 +583,8 @@@ public class TabletServer extends Abstr metrics = new TabletServerMetrics(this); updateMetrics = new TabletServerUpdateMetrics(); scanMetrics = new TabletServerScanMetrics(); + sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads); mincMetrics = new TabletServerMinCMetrics(); - ceMetrics = new CompactionExecutorsMetrics(); pausedMetrics = new PausedCompactionMetrics(); blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(), this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache()); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java index 210adfc082,96a33f50b1..d23c60fe34 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java @@@ -34,8 -38,11 +38,11 @@@ public abstract class ScanTask<T> imple protected final TabletHostingServer server; protected AtomicBoolean interruptFlag; protected ArrayBlockingQueue<Object> resultQueue; - protected AtomicInteger state; - private AtomicReference<ScanRunState> runState; + protected final AtomicInteger state; - protected final AtomicReference<ScanRunState> runState; ++ private final AtomicReference<ScanRunState> runState; + + private Thread scanThread = null; + private final Lock scanThreadLock = new ReentrantLock(); private static final int INITIAL = 1; private static final int ADDED = 2; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 6efbeb2b2b,0193b46c93..a9b571078e --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -974,9 -1111,20 +998,18 @@@ public class Tablet extends TabletBase // close data files getTabletResources().close(); - if (completeClose) { - closeState = CloseState.COMPLETE; - } + closeState = CloseState.COMPLETE; } + private boolean disallowNewReservations(ScanParameters scanParameters) { + var scanSessId = scanParameters.getScanSessionId(); + if (scanSessId != null) { + return getTabletServer().getSessionManager().disallowNewReservations(scanSessId); + } else { + return true; + } + } + private void closeConsistencyCheck() { long num = tabletMemory.getMemTable().getNumEntries(); diff --cc test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index 0000000000,e8c3a47a3e..9ebcc84902 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@@ -1,0 -1,387 +1,387 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.ArrayList; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; + import java.util.TreeSet; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.Semaphore; + import java.util.stream.Collectors; + + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.IteratorSetting; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.Range; + import org.apache.accumulo.core.iterators.WrappingIterator; + import org.apache.accumulo.core.metadata.schema.TabletMetadata; + import org.apache.accumulo.core.metrics.MetricsProducer; + import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.test.functional.ConfigurableMacBase; + import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; + import org.apache.accumulo.test.metrics.TestStatsDSink; + import org.apache.accumulo.test.util.Wait; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.api.Test; + + public class ZombieScanIT extends ConfigurableMacBase { + + private static TestStatsDSink sink; + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void after() throws Exception { + sink.close(); + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + super.configure(cfg, hadoopCoreSite); + + // Make sessions time out much more quickly. This will cause a session to be classified as a + // zombie scan much sooner. + cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "6s"); + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "1s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); - cfg.setNumTservers(1); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + } + + /** + * An iterator that should get stuck forever when used + */ + public static class ZombieIterator extends WrappingIterator { + @Override + public boolean hasTop() { + // must call super.hasTop() before blocking as that will run accumulo code to setup iterator + boolean ht = super.hasTop(); + Semaphore semaphore = new Semaphore(10); + semaphore.acquireUninterruptibly(5); + // this should block forever + semaphore.acquireUninterruptibly(6); + return ht; + } + } + + /** + * An iterator that should get stuck but can be interrupted + */ + public static class StuckIterator extends WrappingIterator { + @Override + public boolean hasTop() { + try { + // must call super.hasTop() before blocking as that will run accumulo code to setup iterator + boolean ht = super.hasTop(); + Semaphore semaphore = new Semaphore(10); + semaphore.acquire(5); + // this should block forever + semaphore.acquire(6); + return ht; + } catch (InterruptedException ie) { + throw new IllegalStateException(ie); + } + } + } + + /** + * This test ensure that scans threads that run forever do not prevent tablets from unloading. + */ + @Test + public void testZombieScan() throws Exception { + + String table = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + + var splits = new TreeSet<Text>(); + splits.add(new Text("3")); + splits.add(new Text("5")); + splits.add(new Text("7")); + var ntc = new NewTableConfiguration().withSplits(splits); + c.tableOperations().create(table, ntc); + + try (var writer = c.createBatchWriter(table)) { + for (var row : List.of("2", "4", "6", "8")) { + Mutation m = new Mutation(row); + m.put("f", "q", "v"); + writer.addMutation(m); + } + } + + // Flush the data otherwise when the tablet attempts to close with an active scan reading from + // the in memory map it will wait for 15 seconds for the scan + c.tableOperations().flush(table, null, null, true); + + var executor = Executors.newCachedThreadPool(); + + // start two zombie scans that should never return using a normal scanner + List<Future<String>> futures = new ArrayList<>(); + for (var row : List.of("2", "4")) { + var future = executor.submit(() -> { + try (var scanner = c.createScanner(table)) { + IteratorSetting iter = new IteratorSetting(100, "Z", ZombieIterator.class); + scanner.addScanIterator(iter); + scanner.setRange(new Range(row)); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + futures.add(future); + } + + // start two zombie scans that should never return using a batch scanner + for (var row : List.of("6", "8")) { + var future = executor.submit(() -> { + try (var scanner = c.createBatchScanner(table)) { + IteratorSetting iter = new IteratorSetting(100, "Z", ZombieIterator.class); + scanner.addScanIterator(iter); + scanner.setRanges(List.of(new Range(row))); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + futures.add(future); + } + + // should eventually see the four zombie scans running against four tablets + Wait.waitFor(() -> countDistinctTabletsScans(table, c) == 4); + + assertEquals(1, c.instanceOperations().getTabletServers().size()); + + // Start 3 new tablet servers, this should cause the table to balance and the tablets with + // zombie scans to unload. The Zombie scans should not prevent the table from unloading. The + // scan threads will still be running on the old tablet servers. - getCluster().getConfig().setNumTservers(4); - getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); ++ getCluster().getConfig().getClusterServerConfiguration().setNumDefaultTabletServers(4); ++ getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Map.of(), 4); + + // Wait for all tablets servers + Wait.waitFor(() -> c.instanceOperations().getTabletServers().size() == 4); + + // The table should eventually balance across the 4 tablet servers + Wait.waitFor(() -> countLocations(table, c) == 4); + + // The zombie scans should still be running + assertTrue(futures.stream().noneMatch(Future::isDone)); + + // Should be able to scan all the tablets at the new locations. + try (var scanner = c.createScanner(table)) { + var rows = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("2", "4", "6", "8"), rows); + } + + try (var scanner = c.createBatchScanner(table)) { + scanner.setRanges(List.of(new Range())); + var rows = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("2", "4", "6", "8"), rows); + } + + // The zombie scans should migrate with the tablets, taking up more scan threads in the + // system. + Set<String> tabletSeversWithZombieScans = new HashSet<>(); + for (String tserver : c.instanceOperations().getTabletServers()) { + if (c.instanceOperations().getActiveScans(tserver).stream() + .flatMap(activeScan -> activeScan.getSsiList().stream()) + .anyMatch(scanIters -> scanIters.contains(ZombieIterator.class.getName()))) { + tabletSeversWithZombieScans.add(tserver); + } + } + assertEquals(4, tabletSeversWithZombieScans.size()); + + executor.shutdownNow(); + } + + } + + /** + * Create some zombie scans and ensure metrics for them show up. + */ + @Test + public void testMetrics() throws Exception { + + Wait.waitFor(() -> { + var zsmc = getZombieScansMetric(); + return zsmc == -1 || zsmc == 0; + }); + + String table = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + + c.tableOperations().create(table); + + var executor = Executors.newCachedThreadPool(); + + // start four stuck scans that should never return data + List<Future<String>> futures = new ArrayList<>(); + for (var row : List.of("2", "4")) { + // start a scan with an iterator that gets stuck and can not be interrupted + futures.add(startStuckScan(c, table, executor, row, false)); + // start a scan with an iterator that gets stuck and can be interrupted + futures.add(startStuckScan(c, table, executor, row, true)); + } + + // start four stuck scans, using a batch scanner, that should never return data + for (var row : List.of("6", "8")) { + // start a scan with an iterator that gets stuck and can not be interrupted + futures.add(startStuckBatchScan(c, table, executor, row, false)); + // start a scan with an iterator that gets stuck and can be interrupted + futures.add(startStuckBatchScan(c, table, executor, row, true)); + } + + // should eventually see the eight stuck scans running + Wait.waitFor(() -> countActiveScans(c, table) == 8); + + // Cancel the scan threads. This will cause the sessions on the server side to timeout and + // become inactive. The stuck threads on the server side related to the timed out sessions + // will be interrupted. + Wait.waitFor(() -> { + futures.forEach(future -> future.cancel(true)); + return futures.stream().allMatch(Future::isDone); + }); + + // Four of the eight running scans should respond to thread interrupts and exit + Wait.waitFor(() -> countActiveScans(c, table) == 4); + + Wait.waitFor(() -> getZombieScansMetric() == 4); + + assertEquals(4, countActiveScans(c, table)); + + // start four more stuck scans with two that will ignore interrupts + futures.clear(); + futures.add(startStuckScan(c, table, executor, "0", false)); + futures.add(startStuckScan(c, table, executor, "0", true)); + futures.add(startStuckBatchScan(c, table, executor, "99", false)); + futures.add(startStuckBatchScan(c, table, executor, "0", true)); + + Wait.waitFor(() -> countActiveScans(c, table) == 8); + + // Cancel the client side scan threads. Should cause the server side threads to be + // interrupted. + Wait.waitFor(() -> { + futures.forEach(future -> future.cancel(true)); + return futures.stream().allMatch(Future::isDone); + }); + + // Two of the stuck threads should respond to interrupts on the server side and exit. + Wait.waitFor(() -> countActiveScans(c, table) == 6); + + Wait.waitFor(() -> getZombieScansMetric() == 6); + + assertEquals(6, countActiveScans(c, table)); + + executor.shutdownNow(); + } + + } + + private static long countLocations(String table, AccumuloClient client) throws Exception { + var ctx = (ClientContext) client; + var tableId = ctx.getTableId(table); + return ctx.getAmple().readTablets().forTable(tableId).build().stream() + .map(TabletMetadata::getLocation).filter(Objects::nonNull).distinct().count(); + } + + private static long countDistinctTabletsScans(String table, AccumuloClient client) + throws Exception { + var tservers = client.instanceOperations().getTabletServers(); + long count = 0; + for (String tserver : tservers) { + count += client.instanceOperations().getActiveScans(tserver).stream() + .filter(activeScan -> activeScan.getTable().equals(table)) + .map(activeScan -> activeScan.getTablet()).distinct().count(); + } + return count; + } + + private Future<String> startStuckScan(AccumuloClient c, String table, ExecutorService executor, + String row, boolean canInterrupt) { + return executor.submit(() -> { + try (var scanner = c.createScanner(table)) { + String className; + if (canInterrupt) { + className = StuckIterator.class.getName(); + } else { + className = ZombieIterator.class.getName(); + } + IteratorSetting iter = new IteratorSetting(100, "Z", className); + scanner.addScanIterator(iter); + scanner.setRange(new Range(row)); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + } + + private Future<String> startStuckBatchScan(AccumuloClient c, String table, + ExecutorService executor, String row, boolean canInterrupt) { + return executor.submit(() -> { + try (var scanner = c.createBatchScanner(table)) { + String className; + if (canInterrupt) { + className = StuckIterator.class.getName(); + } else { + className = ZombieIterator.class.getName(); + } + + IteratorSetting iter = new IteratorSetting(100, "Z", className); + scanner.addScanIterator(iter); + scanner.setRanges(List.of(new Range(row))); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + } + + private int getZombieScansMetric() { + return sink.getLines().stream().map(TestStatsDSink::parseStatsDMetric) + .filter(metric -> metric.getName().equals(MetricsProducer.METRICS_SCAN_ZOMBIE_THREADS)) + .mapToInt(metric -> Integer.parseInt(metric.getValue())).max().orElse(-1); + } + }