This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new dcbc5d606a Stops waiting on scans during tablet close. (#4819) dcbc5d606a is described below commit dcbc5d606ae5be4fa692ee5985640c42ef50cbf8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Aug 27 14:58:17 2024 -0700 Stops waiting on scans during tablet close. (#4819) Before this change when a tablet was closing it would prevent new scans from starting and wait for running scans to finish. This could cause the tablet to become unavailable for long periods of time. After this change tablets will no longer wait on running scans to complete. They will set something to interrupt the thread and then disable the client scan session which prevents the client form ever seeing any data after the tablet is closed. Once all scan sessions are disabled the tablet will proceed to close. Disabling the scan session will cause the client side scanner to eventually switch to the new tablet server where the tablet is loaded. Its possible that threads may be left running on the tablet server therefore it will be important to also implement #4756 fixes #4757 --- .../accumulo/tserver/ThriftScanClientHandler.java | 4 + .../accumulo/tserver/scan/ScanParameters.java | 10 ++ .../apache/accumulo/tserver/session/Session.java | 1 + .../accumulo/tserver/session/SessionManager.java | 36 ++++- .../accumulo/tserver/tablet/ScanDataSource.java | 4 + .../org/apache/accumulo/tserver/tablet/Tablet.java | 39 ++++- .../tserver/session/SessionManagerTest.java | 42 ++++++ .../org/apache/accumulo/test/ZombieScanIT.java | 166 ++++++++++++++++++--- 8 files changed, 278 insertions(+), 24 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index 6bacdcab0c..b127c8ba31 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -214,6 +214,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { long sid = server.getSessionManager().createSession(scanSession, true); + scanParams.setScanSessionId(sid); + ScanResult scanResult; try { scanResult = continueScan(tinfo, sid, scanSession, busyTimeout); @@ -440,6 +442,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { long sid = server.getSessionManager().createSession(mss, true); + scanParams.setScanSessionId(sid); + MultiScanResult result; try { result = continueMultiScan(sid, mss, busyTimeout); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java index 3ce3b4a24f..ff601fff1f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java @@ -43,6 +43,7 @@ public final class ScanParameters { private final SamplerConfiguration samplerConfig; private final long batchTimeOut; private final String classLoaderContext; + private volatile Long scanSessionId = null; private volatile ScanDispatch dispatch; public ScanParameters(int maxEntries, Authorizations authorizations, Set<Column> columnSet, @@ -106,6 +107,14 @@ public final class ScanParameters { return dispatch; } + public void setScanSessionId(long scanSessionId) { + this.scanSessionId = scanSessionId; + } + + public Long getScanSessionId() { + return scanSessionId; + } + @Override public String toString() { StringBuilder buf = new StringBuilder(); @@ -118,6 +127,7 @@ public final class ScanParameters { buf.append(", maxEntries=").append(this.maxEntries); buf.append(", num=").append(this.maxEntries); buf.append(", samplerConfig=").append(this.samplerConfig); + buf.append(", scanSessionId=").append(this.scanSessionId); buf.append("]"); return buf.toString(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index 3f170331c8..29247247c3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -34,6 +34,7 @@ public class Session { public long lastAccessTime; public long startTime; private State state = State.NEW; + boolean allowReservation = true; private final Timer stateChangeTimer = Timer.startNew(); private final TCredentials credentials; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 76ecdb62ba..d32dbcd14c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -120,9 +120,9 @@ public class SessionManager { synchronized (session) { if (session.getState() == State.RESERVED) { throw new IllegalStateException( - "Attempted to reserved session that is already reserved " + sessionId); + "Attempted to reserve session that is already reserved " + sessionId); } - if (session.getState() == State.REMOVED) { + if (session.getState() == State.REMOVED || !session.allowReservation) { return null; } session.setState(State.RESERVED); @@ -138,7 +138,7 @@ public class SessionManager { if (session != null) { synchronized (session) { - if (session.getState() == State.REMOVED) { + if (session.getState() == State.REMOVED || !session.allowReservation) { return null; } @@ -154,7 +154,7 @@ public class SessionManager { throw new IllegalStateException( "Attempted to reserved session that is already reserved " + sessionId); } - if (session.getState() == State.REMOVED) { + if (session.getState() == State.REMOVED || !session.allowReservation) { return null; } session.setState(State.RESERVED); @@ -173,6 +173,7 @@ public class SessionManager { if (session.getState() != State.RESERVED) { throw new IllegalStateException("Cannon unreserve, state: " + session.getState()); } + session.notifyAll(); session.setState(State.UNRESERVED); session.lastAccessTime = System.currentTimeMillis(); @@ -253,6 +254,33 @@ public class SessionManager { return removed; } + /** + * Prevents a session from ever being reserved in the future. This method can be called + * concurrently when another thread has the session reserved w/o impacting the other thread. When + * the session is currently reserved by another thread that thread can unreserve as normal and + * after that this session can never be reserved again. Since the session can never be reserved + * after this call it will eventually age off and be cleaned up. + * + * @return true if the sessions is currently not reserved, false otherwise + */ + public boolean disallowNewReservations(long sessionId) { + var session = getSession(sessionId); + if (session == null) { + return true; + } + synchronized (session) { + if (session.allowReservation) { + // Prevent future reservations of this session. + session.allowReservation = false; + log.debug("disabled session {}", sessionId); + } + + // If nothing can reserve the session and it is not currently reserved then the session is + // disabled and will eventually be cleaned up. + return session.getState() != State.RESERVED; + } + } + static void cleanup(BlockingQueue<Session> deferredCleanupQueue, Session session) { if (!session.cleanup()) { var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index d6af088cbd..8b7aca78f7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -321,4 +321,8 @@ class ScanDataSource implements DataSource { .append("expectedDeletionCount", expectedDeletionCount).append("scanParams", scanParams) .toString(); } + + public ScanParameters getScanParameters() { + return scanParams; + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index f74eb0912f..f229799f6d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1038,13 +1038,31 @@ public class Tablet extends TabletBase { activeScan.interrupt(); } + // create a copy so that it can be whittled down as client sessions are disabled + List<ScanDataSource> runningScans = new ArrayList<>(this.activeScans); + + runningScans.removeIf(scanDataSource -> { + boolean currentlyUnreserved = disallowNewReservations(scanDataSource.getScanParameters()); + if (currentlyUnreserved) { + log.debug("Disabled scan session in tablet close {} {}", extent, scanDataSource); + } + return currentlyUnreserved; + }); + long lastLogTime = System.nanoTime(); // wait for reads and writes to complete - while (writesInProgress > 0 || !activeScans.isEmpty()) { + while (writesInProgress > 0 || !runningScans.isEmpty()) { + runningScans.removeIf(scanDataSource -> { + boolean currentlyUnreserved = disallowNewReservations(scanDataSource.getScanParameters()); + if (currentlyUnreserved) { + log.debug("Disabled scan session in tablet close {} {}", extent, scanDataSource); + } + return currentlyUnreserved; + }); if (log.isDebugEnabled() && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) { - for (ScanDataSource activeScan : activeScans) { + for (ScanDataSource activeScan : runningScans) { log.debug("Waiting on scan in completeClose {} {}", extent, activeScan); } @@ -1053,13 +1071,19 @@ public class Tablet extends TabletBase { try { log.debug("Waiting to completeClose for {}. {} writes {} scans", extent, writesInProgress, - activeScans.size()); + runningScans.size()); this.wait(50); } catch (InterruptedException e) { log.error("Interrupted waiting to completeClose for extent {}", extent, e); } } + // It is assumed that nothing new would have been added to activeScans since it was copied, so + // check that assumption. At this point activeScans should be empty or everything in it should + // be disabled. + Preconditions.checkState(activeScans.stream() + .allMatch(scanDataSource -> disallowNewReservations(scanDataSource.getScanParameters()))); + getTabletMemory().waitForMinC(); if (saveState && getTabletMemory().getMemTable().getNumEntries() > 0) { @@ -1107,6 +1131,15 @@ public class Tablet extends TabletBase { } } + private boolean disallowNewReservations(ScanParameters scanParameters) { + var scanSessId = scanParameters.getScanSessionId(); + if (scanSessId != null) { + return getTabletServer().getSessionManager().disallowNewReservations(scanSessId); + } else { + return true; + } + } + private void closeConsistencyCheck() { long num = tabletMemory.getMemTable().getNumEntries(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java index be3626ea71..38565f0a04 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java @@ -18,12 +18,20 @@ */ package org.apache.accumulo.tserver.session; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.server.ServerContext; import org.junit.jupiter.api.Test; public class SessionManagerTest { @@ -106,4 +114,38 @@ public class SessionManagerTest { assertEquals(2, deferredCleanupQeue.stream().filter(s -> ((TestSession) s).cleanupCount == 2).count()); } + + @Test + public void testDisallowNewReservation() { + var sessionManager = createSessionManager(); + + var sid = sessionManager.createSession(new TestSession(2), true); + + // this should prevent future reservation and return false because its currently reserved + assertFalse(sessionManager.disallowNewReservations(sid)); + + // should not have a problem un-reserving + sessionManager.unreserveSession(sid); + + // should not be able to reserve the session because reservations were disabled + assertNull(sessionManager.reserveSession(sid)); + assertNull(sessionManager.reserveSession(sid, false)); + + // should return true now that its not reserved + assertTrue(sessionManager.disallowNewReservations(sid)); + + sessionManager.removeSession(sid); + + // should return true for nonexistent session + assertTrue(sessionManager.disallowNewReservations(sid)); + } + + private SessionManager createSessionManager() { + ServerContext ctx = createMock(ServerContext.class); + expect(ctx.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + var executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); + expect(ctx.getScheduledExecutor()).andReturn(executor).anyTimes(); + replay(ctx); + return new SessionManager(ctx); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index 25f5ab3195..e8c3a47a3e 100644 --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -18,30 +18,43 @@ */ package org.apache.accumulo.test; +import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -115,6 +128,117 @@ public class ZombieScanIT extends ConfigurableMacBase { } } + /** + * This test ensure that scans threads that run forever do not prevent tablets from unloading. + */ + @Test + public void testZombieScan() throws Exception { + + String table = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + + var splits = new TreeSet<Text>(); + splits.add(new Text("3")); + splits.add(new Text("5")); + splits.add(new Text("7")); + var ntc = new NewTableConfiguration().withSplits(splits); + c.tableOperations().create(table, ntc); + + try (var writer = c.createBatchWriter(table)) { + for (var row : List.of("2", "4", "6", "8")) { + Mutation m = new Mutation(row); + m.put("f", "q", "v"); + writer.addMutation(m); + } + } + + // Flush the data otherwise when the tablet attempts to close with an active scan reading from + // the in memory map it will wait for 15 seconds for the scan + c.tableOperations().flush(table, null, null, true); + + var executor = Executors.newCachedThreadPool(); + + // start two zombie scans that should never return using a normal scanner + List<Future<String>> futures = new ArrayList<>(); + for (var row : List.of("2", "4")) { + var future = executor.submit(() -> { + try (var scanner = c.createScanner(table)) { + IteratorSetting iter = new IteratorSetting(100, "Z", ZombieIterator.class); + scanner.addScanIterator(iter); + scanner.setRange(new Range(row)); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + futures.add(future); + } + + // start two zombie scans that should never return using a batch scanner + for (var row : List.of("6", "8")) { + var future = executor.submit(() -> { + try (var scanner = c.createBatchScanner(table)) { + IteratorSetting iter = new IteratorSetting(100, "Z", ZombieIterator.class); + scanner.addScanIterator(iter); + scanner.setRanges(List.of(new Range(row))); + return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) + .orElse("none"); + } + }); + futures.add(future); + } + + // should eventually see the four zombie scans running against four tablets + Wait.waitFor(() -> countDistinctTabletsScans(table, c) == 4); + + assertEquals(1, c.instanceOperations().getTabletServers().size()); + + // Start 3 new tablet servers, this should cause the table to balance and the tablets with + // zombie scans to unload. The Zombie scans should not prevent the table from unloading. The + // scan threads will still be running on the old tablet servers. + getCluster().getConfig().setNumTservers(4); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + + // Wait for all tablets servers + Wait.waitFor(() -> c.instanceOperations().getTabletServers().size() == 4); + + // The table should eventually balance across the 4 tablet servers + Wait.waitFor(() -> countLocations(table, c) == 4); + + // The zombie scans should still be running + assertTrue(futures.stream().noneMatch(Future::isDone)); + + // Should be able to scan all the tablets at the new locations. + try (var scanner = c.createScanner(table)) { + var rows = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("2", "4", "6", "8"), rows); + } + + try (var scanner = c.createBatchScanner(table)) { + scanner.setRanges(List.of(new Range())); + var rows = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(Set.of("2", "4", "6", "8"), rows); + } + + // The zombie scans should migrate with the tablets, taking up more scan threads in the + // system. + Set<String> tabletSeversWithZombieScans = new HashSet<>(); + for (String tserver : c.instanceOperations().getTabletServers()) { + if (c.instanceOperations().getActiveScans(tserver).stream() + .flatMap(activeScan -> activeScan.getSsiList().stream()) + .anyMatch(scanIters -> scanIters.contains(ZombieIterator.class.getName()))) { + tabletSeversWithZombieScans.add(tserver); + } + } + assertEquals(4, tabletSeversWithZombieScans.size()); + + executor.shutdownNow(); + } + + } + /** * Create some zombie scans and ensure metrics for them show up. */ @@ -152,7 +276,7 @@ public class ZombieScanIT extends ConfigurableMacBase { } // should eventually see the eight stuck scans running - Wait.waitFor(() -> countScansForTable(table, c) == 8); + Wait.waitFor(() -> countActiveScans(c, table) == 8); // Cancel the scan threads. This will cause the sessions on the server side to timeout and // become inactive. The stuck threads on the server side related to the timed out sessions @@ -163,11 +287,11 @@ public class ZombieScanIT extends ConfigurableMacBase { }); // Four of the eight running scans should respond to thread interrupts and exit - Wait.waitFor(() -> countScansForTable(table, c) == 4); + Wait.waitFor(() -> countActiveScans(c, table) == 4); Wait.waitFor(() -> getZombieScansMetric() == 4); - assertEquals(4, countScansForTable(table, c)); + assertEquals(4, countActiveScans(c, table)); // start four more stuck scans with two that will ignore interrupts futures.clear(); @@ -176,7 +300,7 @@ public class ZombieScanIT extends ConfigurableMacBase { futures.add(startStuckBatchScan(c, table, executor, "99", false)); futures.add(startStuckBatchScan(c, table, executor, "0", true)); - Wait.waitFor(() -> countScansForTable(table, c) == 8); + Wait.waitFor(() -> countActiveScans(c, table) == 8); // Cancel the client side scan threads. Should cause the server side threads to be // interrupted. @@ -186,17 +310,36 @@ public class ZombieScanIT extends ConfigurableMacBase { }); // Two of the stuck threads should respond to interrupts on the server side and exit. - Wait.waitFor(() -> countScansForTable(table, c) == 6); + Wait.waitFor(() -> countActiveScans(c, table) == 6); Wait.waitFor(() -> getZombieScansMetric() == 6); - assertEquals(6, countScansForTable(table, c)); + assertEquals(6, countActiveScans(c, table)); executor.shutdownNow(); } } + private static long countLocations(String table, AccumuloClient client) throws Exception { + var ctx = (ClientContext) client; + var tableId = ctx.getTableId(table); + return ctx.getAmple().readTablets().forTable(tableId).build().stream() + .map(TabletMetadata::getLocation).filter(Objects::nonNull).distinct().count(); + } + + private static long countDistinctTabletsScans(String table, AccumuloClient client) + throws Exception { + var tservers = client.instanceOperations().getTabletServers(); + long count = 0; + for (String tserver : tservers) { + count += client.instanceOperations().getActiveScans(tserver).stream() + .filter(activeScan -> activeScan.getTable().equals(table)) + .map(activeScan -> activeScan.getTablet()).distinct().count(); + } + return count; + } + private Future<String> startStuckScan(AccumuloClient c, String table, ExecutorService executor, String row, boolean canInterrupt) { return executor.submit(() -> { @@ -241,15 +384,4 @@ public class ZombieScanIT extends ConfigurableMacBase { .filter(metric -> metric.getName().equals(MetricsProducer.METRICS_SCAN_ZOMBIE_THREADS)) .mapToInt(metric -> Integer.parseInt(metric.getValue())).max().orElse(-1); } - - private static long countScansForTable(String table, AccumuloClient client) throws Exception { - var tservers = client.instanceOperations().getTabletServers(); - long count = 0; - for (String tserver : tservers) { - count += client.instanceOperations().getActiveScans(tserver).stream() - .filter(activeScan -> activeScan.getTable().equals(table)).count(); - } - return count; - } - }