ACCUMULO-3509: Make cleanup stateful to minimize blocking By enabling state ( true/false) within the cleanup method, the change will avoid blocking on a scan session being swept. if the session cleanup blocks because a ScanSession is still being read, we may block until the ScanBatch returns for that ScanSession.
The change uses a simple semaphore ( purely because I like the word ) to attempt acquisition. If that fails, we return false from the cleanup and reintroduce that Session back into the queue to clean up. Closes apache/accumulo#62 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/46ad8368 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/46ad8368 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/46ad8368 Branch: refs/heads/1.7 Commit: 46ad8368e160c56c03571b467f8ae603c50992f6 Parents: 567f52f Author: phrocker <marc.par...@gmail.com> Authored: Mon Jan 4 10:59:28 2016 -0500 Committer: Josh Elser <els...@apache.org> Committed: Mon Jan 11 21:24:05 2016 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 6 +- .../org/apache/accumulo/tserver/Tablet.java | 67 +++++-- .../apache/accumulo/tserver/TabletServer.java | 81 +++++++-- .../test/functional/ScanSessionTimeOutIT.java | 15 +- .../test/functional/SessionBlockVerifyIT.java | 176 +++++++++++++++++++ 5 files changed, 305 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 632bb59..9243494 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -225,7 +225,11 @@ public enum Property { + " tserver.walog.max.size >= this property."), TSERV_MEM_MGMT("tserver.memory.manager", "org.apache.accumulo.server.tabletserver.LargestFirstMemoryManager", PropertyType.CLASSNAME, "An implementation of MemoryManger that accumulo will use."), - TSERV_SESSION_MAXIDLE("tserver.session.idle.max", "1m", PropertyType.TIMEDURATION, "maximum idle time for a session"), + TSERV_SESSION_MAXIDLE("tserver.session.idle.max", "1m", PropertyType.TIMEDURATION, "When a tablet server's SimpleTimer thread triggers to check " + + "idle sessions, this configurable option will be used to evaluate scan sessions to determine if they can be closed due to inactivity"), + TSERV_UPDATE_SESSION_MAXIDLE("tserver.session.update.idle.max", "1m", PropertyType.TIMEDURATION, + "When a tablet server's SimpleTimer thread triggers to check " + + "idle sessions, this configurable option will be used to evaluate update sessions to determine if they can be closed due to inactivity"), TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT, "The maximum number of concurrent read ahead that will execute. This effectively" + " limits the number of long running scans that can run concurrently per tserver."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java index efed665..3f00c0b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java @@ -39,6 +39,8 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -1761,33 +1763,47 @@ public class Tablet { private ScanDataSource isolatedDataSource; private boolean sawException = false; private boolean scanClosed = false; + /** + * A fair semaphore of one is used since explicitly know the access pattern will be one thread to read and another to call close if the session becomes + * idle. Since we're explicitly preventing re-entrance, we're currently using a Sempahore. If at any point we decide read needs to be re-entrant, we can + * switch to a Reentrant lock. + */ + private Semaphore scannerSemaphore; Scanner(Range range, ScanOptions options) { this.range = range; this.options = options; + scannerSemaphore = new Semaphore(1, true); } - synchronized ScanBatch read() throws IOException, TabletClosedException { + ScanBatch read() throws IOException, TabletClosedException { - if (sawException) - throw new IllegalStateException("Tried to use scanner after exception occurred."); - - if (scanClosed) - throw new IllegalStateException("Tried to use scanner after it was closed."); + ScanDataSource dataSource = null; Batch results = null; - ScanDataSource dataSource; + try { - if (options.isolated) { - if (isolatedDataSource == null) - isolatedDataSource = new ScanDataSource(options); - dataSource = isolatedDataSource; - } else { - dataSource = new ScanDataSource(options); - } + try { + scannerSemaphore.acquire(); + } catch (InterruptedException e) { + sawException = true; + } - try { + // sawException may have occurred within close, so we cannot assume that an interrupted exception was its cause + if (sawException) + throw new IllegalStateException("Tried to use scanner after exception occurred."); + + if (scanClosed) + throw new IllegalStateException("Tried to use scanner after it was closed."); + + if (options.isolated) { + if (isolatedDataSource == null) + isolatedDataSource = new ScanDataSource(options); + dataSource = isolatedDataSource; + } else { + dataSource = new ScanDataSource(options); + } SortedKeyValueIterator<Key,Value> iter; @@ -1834,9 +1850,9 @@ public class Tablet { } finally { // code in finally block because always want // to return mapfiles, even when exception is thrown - if (!options.isolated) + if (null != dataSource && !options.isolated) dataSource.close(false); - else if (dataSource.fileManager != null) + else if (null != dataSource && dataSource.fileManager != null) dataSource.fileManager.detach(); synchronized (Tablet.this) { @@ -1846,19 +1862,32 @@ public class Tablet { queryBytes += results.numBytes; } } + + scannerSemaphore.release(); } } // close and read are synchronized because can not call close on the data source while it is in use // this cloud lead to the case where file iterators that are in use by a thread are returned // to the pool... this would be bad - void close() { + boolean close() { options.interruptFlag.set(true); - synchronized (this) { + boolean obtainedLock = false; + try { + obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS); + if (!obtainedLock) + return false; + scanClosed = true; if (isolatedDataSource != null) isolatedDataSource.close(false); + } catch (InterruptedException e) { + return false; + } finally { + if (obtainedLock) + scannerSemaphore.release(); } + return true; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 29cf0d3..b7aaf06 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -237,6 +237,8 @@ import org.apache.thrift.server.TServer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; enum ScanRunState { @@ -386,25 +388,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu String client = TServerUtils.clientAddress.get(); public boolean reserved; - public void cleanup() {} + public boolean cleanup() { + return true; + } } private static class SessionManager { SecureRandom random; Map<Long,Session> sessions; - long maxIdle; + private long maxIdle; + private long maxUpdateIdle; + private List<Session> idleSessions = new ArrayList<Session>(); + private final Long expiredSessionMarker = new Long(-1); SessionManager(AccumuloConfiguration conf) { random = new SecureRandom(); sessions = new HashMap<Long,Session>(); - + maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); Runnable r = new Runnable() { @Override public void run() { - sweep(maxIdle); + sweep(maxIdle, maxUpdateIdle); } }; @@ -506,14 +513,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu return session; } - private void sweep(long maxIdle) { + private void sweep(final long maxIdle, final long maxUpdateIdle) { ArrayList<Session> sessionsToCleanup = new ArrayList<Session>(); synchronized (this) { Iterator<Session> iter = sessions.values().iterator(); while (iter.hasNext()) { Session session = iter.next(); + long configuredIdle = maxIdle; + if (session instanceof UpdateSession) { + configuredIdle = maxUpdateIdle; + } long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > maxIdle && !session.reserved) { + if (idleTime > configuredIdle && !session.reserved) { log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms"); iter.remove(); sessionsToCleanup.add(session); @@ -521,10 +532,21 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } } - // do clean up outside of lock - for (Session session : sessionsToCleanup) { - session.cleanup(); + // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list + + synchronized (idleSessions) { + + sessionsToCleanup.addAll(idleSessions); + + idleSessions.clear(); + + // perform cleanup for all of the sessions + for (Session session : sessionsToCleanup) { + if (!session.cleanup()) + idleSessions.add(session); + } } + } synchronized void removeIfNotAccessed(final long sessionId, final long delay) { @@ -556,7 +578,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>(); - for (Entry<Long,Session> entry : sessions.entrySet()) { + Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); + + synchronized (idleSessions) { + /** + * Add sessions so that get the list returned in the active scans call + */ + for (Session session : idleSessions) { + copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); + } + } + + for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { Session session = entry.getValue(); @SuppressWarnings("rawtypes") @@ -595,11 +628,20 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public synchronized List<ActiveScan> getActiveScans() { - ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>(); + final List<ActiveScan> activeScans = new ArrayList<ActiveScan>(); + final long ct = System.currentTimeMillis(); + final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); - long ct = System.currentTimeMillis(); + synchronized (idleSessions) { + /** + * Add sessions so that get the list returned in the active scans call + */ + for (Session session : idleSessions) { + copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); + } + } - for (Entry<Long,Session> entry : sessions.entrySet()) { + for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { Session session = entry.getValue(); if (session instanceof ScanSession) { ScanSession ss = (ScanSession) session; @@ -841,8 +883,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public AtomicBoolean interruptFlag; @Override - public void cleanup() { + public boolean cleanup() { interruptFlag.set(true); + return true; } } @@ -879,13 +922,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; @Override - public void cleanup() { + public boolean cleanup() { try { if (nextBatchTask != null) nextBatchTask.cancel(true); } finally { if (scanner != null) - scanner.close(); + return scanner.close(); + else + return true; } } @@ -908,9 +953,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public KeyExtent threadPoolExtent; @Override - public void cleanup() { + public boolean cleanup() { if (lookupTask != null) lookupTask.cancel(true); + // the cancellation should provide us the safety to return true here + return true; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java index 91fc9eb..6009462 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java @@ -49,7 +49,7 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT { @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), "3")); + cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString())); } @Override @@ -63,12 +63,21 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT { public void reduceSessionIdle() throws Exception { InstanceOperations ops = getConnector().instanceOperations(); sessionIdle = ops.getSystemConfiguration().get(Property.TSERV_SESSION_MAXIDLE.getKey()); - ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), "3"); + ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString()); log.info("Waiting for existing session idle time to expire"); Thread.sleep(AccumuloConfiguration.getTimeInMillis(sessionIdle)); log.info("Finished waiting"); } + /** + * Returns the max idle time as a string. + * + * @return new max idle time + */ + protected String getMaxIdleTimeString() { + return "3"; + } + @After public void resetSessionIdle() throws Exception { if (null != sessionIdle) { @@ -108,7 +117,7 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT { } - private void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws Exception { + protected void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws Exception { for (int i = start; i < stop; i++) { Text er = new Text(String.format("%08d", i)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java new file mode 100644 index 0000000..05f304b --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * Verify that we have resolved blocking issue by ensuring that we have not lost scan sessions which we know to currently be running + */ +public class SessionBlockVerifyIT extends ScanSessionTimeOutIT { + private static final Logger log = LoggerFactory.getLogger(SessionBlockVerifyIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteConfig = cfg.getSiteConfig(); + cfg.setNumTservers(1); + siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString()); + siteConfig.put(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "11"); + cfg.setSiteConfig(siteConfig); + } + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Override + protected String getMaxIdleTimeString() { + return "1s"; + } + + ExecutorService service = Executors.newFixedThreadPool(10); + + @Test + public void run() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(new Text(String.format("%08d", i))); + for (int j = 0; j < 3; j++) + m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8))); + + bw.addMutation(m); + } + + bw.close(); + + Scanner scanner = c.createScanner(tableName, new Authorizations()); + scanner.setReadaheadThreshold(20000); + scanner.setRange(new Range(String.format("%08d", 0), String.format("%08d", 1000))); + + // test by making a slow iterator and then a couple of fast ones. + // when then checking we shouldn't have any running except the slow iterator + IteratorSetting setting = new IteratorSetting(21, SlowIterator.class); + SlowIterator.setSeekSleepTime(setting, Long.MAX_VALUE); + SlowIterator.setSleepTime(setting, Long.MAX_VALUE); + scanner.addScanIterator(setting); + + final Iterator<Entry<Key,Value>> slow = scanner.iterator(); + + final List<Future<Boolean>> callables = new ArrayList<Future<Boolean>>(); + final CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + Future<Boolean> callable = service.submit(new Callable<Boolean>() { + public Boolean call() { + latch.countDown(); + while (slow.hasNext()) { + + slow.next(); + } + return slow.hasNext(); + } + }); + callables.add(callable); + } + + latch.await(); + + log.info("Starting SessionBlockVerifyIT"); + + // let's add more for good measure. + for (int i = 0; i < 2; i++) { + Scanner scanner2 = c.createScanner(tableName, new Authorizations()); + + scanner2.setRange(new Range(String.format("%08d", 0), String.format("%08d", 1000))); + + scanner2.setBatchSize(1); + Iterator<Entry<Key,Value>> iter = scanner2.iterator(); + // call super's verify mechanism + verify(iter, 0, 1000); + + } + + int sessionsFound = 0; + // we have configured 1 tserver, so we can grab the one and only + String tserver = Iterables.getOnlyElement(c.instanceOperations().getTabletServers()); + + final List<ActiveScan> scans = c.instanceOperations().getActiveScans(tserver); + + for (ActiveScan scan : scans) { + // only here to minimize chance of seeing meta extent scans + + if (tableName.equals(scan.getTable()) && scan.getSsiList().size() > 0) { + assertEquals("Not the expected iterator", 1, scan.getSsiList().size()); + assertTrue("Not the expected iterator", scan.getSsiList().iterator().next().contains("SlowIterator")); + sessionsFound++; + } + + } + + /** + * The message below indicates the problem that we experience within ACCUMULO-3509. The issue manifests as a blockage in the Scanner synchronization that + * prevent us from making the close call against it. Since the close blocks until a read is finished, we ultimately have a block within the sweep of + * SessionManager. As a result never reap subsequent idle sessions AND we will orphan the sessionsToCleanup in the sweep, leading to an inaccurate count + * within sessionsFound. + */ + assertEquals("Must have ten sessions. Failure indicates a synchronization block within the sweep mechanism", 10, sessionsFound); + for (Future<Boolean> callable : callables) { + callable.cancel(true); + } + service.shutdown(); + } + +}