Merge branch '1.6' into 1.7 Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/65628282 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/65628282 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/65628282 Branch: refs/heads/1.7 Commit: 656282825ad0eb4ee51052e71492a3d3fd5c1f02 Parents: 642add8 46ad836 Author: Josh Elser <els...@apache.org> Authored: Tue Jan 12 00:24:24 2016 -0500 Committer: Josh Elser <els...@apache.org> Committed: Tue Jan 12 00:24:24 2016 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 6 +- .../tserver/session/ConditionalSession.java | 3 +- .../tserver/session/MultiScanSession.java | 4 +- .../accumulo/tserver/session/ScanSession.java | 6 +- .../accumulo/tserver/session/Session.java | 4 +- .../tserver/session/SessionManager.java | 61 ++++++- .../apache/accumulo/tserver/tablet/Scanner.java | 68 +++++-- .../test/functional/ScanSessionTimeOutIT.java | 15 +- .../test/functional/SessionBlockVerifyIT.java | 176 +++++++++++++++++++ 9 files changed, 306 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java index cd5e617,0000000..138f558 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java @@@ -1,44 -1,0 +1,45 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.session; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.thrift.TCredentials; + +public class ConditionalSession extends Session { + public final TCredentials credentials; + public final Authorizations auths; + public final String tableId; + public final AtomicBoolean interruptFlag = new AtomicBoolean(); + public final Durability durability; + + public ConditionalSession(TCredentials credentials, Authorizations authorizations, String tableId, Durability durability) { + super(credentials); + this.credentials = credentials; + this.auths = authorizations; + this.tableId = tableId; + this.durability = durability; + } + + @Override - public void cleanup() { ++ public boolean cleanup() { + interruptFlag.set(true); ++ return true; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java index b326e10,0000000..2fd590c mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java @@@ -1,63 -1,0 +1,65 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.session; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.MultiScanResult; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.tserver.scan.ScanTask; + +public class MultiScanSession extends Session { + public final KeyExtent threadPoolExtent; + public final HashSet<Column> columnSet = new HashSet<Column>(); + public final Map<KeyExtent,List<Range>> queries; + public final List<IterInfo> ssiList; + public final Map<String,Map<String,String>> ssio; + public final Authorizations auths; + + // stats + public int numRanges; + public int numTablets; + public int numEntries; + public long totalLookupTime; + + public volatile ScanTask<MultiScanResult> lookupTask; + + public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList, + Map<String,Map<String,String>> ssio, Authorizations authorizations) { + super(credentials); + this.queries = queries; + this.ssiList = ssiList; + this.ssio = ssio; + this.auths = authorizations; + this.threadPoolExtent = threadPoolExtent; + } + + @Override - public void cleanup() { ++ public boolean cleanup() { + if (lookupTask != null) + lookupTask.cancel(true); ++ // the cancellation should provide us the safety to return true here ++ return true; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java index d5b0027,0000000..743f4d3 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java @@@ -1,70 -1,0 +1,72 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.session; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.Stat; +import org.apache.accumulo.tserver.scan.ScanTask; +import org.apache.accumulo.tserver.tablet.ScanBatch; +import org.apache.accumulo.tserver.tablet.Scanner; + +public class ScanSession extends Session { + public final Stat nbTimes = new Stat(); + public final KeyExtent extent; + public final Set<Column> columnSet; + public final List<IterInfo> ssiList; + public final Map<String,Map<String,String>> ssio; + public final Authorizations auths; + public final AtomicBoolean interruptFlag = new AtomicBoolean(); + public long entriesReturned = 0; + public long batchCount = 0; + public volatile ScanTask<ScanBatch> nextBatchTask; + public Scanner scanner; + public final long readaheadThreshold; + + public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, + Authorizations authorizations, long readaheadThreshold) { + super(credentials); + this.extent = extent; + this.columnSet = columnSet; + this.ssiList = ssiList; + this.ssio = ssio; + this.auths = authorizations; + this.readaheadThreshold = readaheadThreshold; + } + + @Override - public void cleanup() { ++ public boolean cleanup() { + try { + if (nextBatchTask != null) + nextBatchTask.cancel(true); + } finally { + if (scanner != null) - scanner.close(); ++ return scanner.close(); ++ else ++ return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index a561166,0000000..1d2d88d mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@@ -1,43 -1,0 +1,45 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.session; + +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.server.rpc.TServerUtils; + +public class Session { + public final String client; + long lastAccessTime; + public long startTime; + boolean reserved; + private final TCredentials credentials; + + Session(TCredentials credentials) { + this.credentials = credentials; + this.client = TServerUtils.clientAddress.get(); + } + + public String getUser() { + return credentials.getPrincipal(); + } + + public TCredentials getCredentials() { + return credentials; + } + - public void cleanup() {} ++ public boolean cleanup() { ++ return true; ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 83d87b3,0000000..1cd7fa6 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@@ -1,319 -1,0 +1,362 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.session; + +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; ++import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; ++import java.util.Set; +import java.util.TimerTask; + +import org.apache.accumulo.core.client.impl.Translator; +import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.thrift.MultiScanResult; +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.ScanState; +import org.apache.accumulo.core.tabletserver.thrift.ScanType; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.tserver.scan.ScanRunState; +import org.apache.accumulo.tserver.scan.ScanTask; +import org.apache.accumulo.tserver.tablet.ScanBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + ++import com.google.common.collect.Iterables; ++import com.google.common.collect.Maps; ++ +public class SessionManager { + private static final Logger log = LoggerFactory.getLogger(SessionManager.class); + + private final SecureRandom random = new SecureRandom(); + private final Map<Long,Session> sessions = new HashMap<Long,Session>(); + private final long maxIdle; ++ private final long maxUpdateIdle; ++ private final List<Session> idleSessions = new ArrayList<Session>(); ++ private final Long expiredSessionMarker = new Long(-1); + private final AccumuloConfiguration aconf; + + public SessionManager(AccumuloConfiguration conf) { + aconf = conf; ++ maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); + maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + + Runnable r = new Runnable() { + @Override + public void run() { - sweep(maxIdle); ++ sweep(maxIdle, maxUpdateIdle); + } + }; + + SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000)); + } + + public synchronized long createSession(Session session, boolean reserve) { + long sid = random.nextLong(); + + while (sessions.containsKey(sid)) { + sid = random.nextLong(); + } + + sessions.put(sid, session); + + session.reserved = reserve; + + session.startTime = session.lastAccessTime = System.currentTimeMillis(); + + return sid; + } + + public long getMaxIdleTime() { + return maxIdle; + } + + /** + * while a session is reserved, it cannot be canceled or removed + */ + + public synchronized Session reserveSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) { + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + public synchronized Session reserveSession(long sessionId, boolean wait) { + Session session = sessions.get(sessionId); + if (session != null) { + while (wait && session.reserved) { + try { + wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + public synchronized void unreserveSession(Session session) { + if (!session.reserved) + throw new IllegalStateException(); + notifyAll(); + session.reserved = false; + session.lastAccessTime = System.currentTimeMillis(); + } + + public synchronized void unreserveSession(long sessionId) { + Session session = getSession(sessionId); + if (session != null) + unreserveSession(session); + } + + public synchronized Session getSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) + session.lastAccessTime = System.currentTimeMillis(); + return session; + } + + public Session removeSession(long sessionId) { + return removeSession(sessionId, false); + } + + public Session removeSession(long sessionId, boolean unreserve) { + Session session = null; + synchronized (this) { + session = sessions.remove(sessionId); + if (unreserve && session != null) + unreserveSession(session); + } + + // do clean up out side of lock.. + if (session != null) + session.cleanup(); + + return session; + } + - private void sweep(long maxIdle) { ++ private void sweep(final long maxIdle, final long maxUpdateIdle) { + List<Session> sessionsToCleanup = new ArrayList<Session>(); + synchronized (this) { + Iterator<Session> iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); ++ long configuredIdle = maxIdle; ++ if (session instanceof UpdateSession) { ++ configuredIdle = maxUpdateIdle; ++ } + long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > maxIdle && !session.reserved) { ++ if (idleTime > configuredIdle && !session.reserved) { + log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); + iter.remove(); + sessionsToCleanup.add(session); + } + } + } + - // do clean up outside of lock - for (Session session : sessionsToCleanup) { - session.cleanup(); ++ // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list ++ ++ synchronized (idleSessions) { ++ ++ sessionsToCleanup.addAll(idleSessions); ++ ++ idleSessions.clear(); ++ ++ // perform cleanup for all of the sessions ++ for (Session session : sessionsToCleanup) { ++ if (!session.cleanup()) ++ idleSessions.add(session); ++ } + } + } + + public synchronized void removeIfNotAccessed(final long sessionId, final long delay) { + Session session = sessions.get(sessionId); + if (session != null) { + final long removeTime = session.lastAccessTime; + TimerTask r = new TimerTask() { + @Override + public void run() { + Session sessionToCleanup = null; + synchronized (SessionManager.this) { + Session session2 = sessions.get(sessionId); + if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { + log.info("Closing not accessed session from user=" + session2.getUser() + ", client=" + session2.client + ", duration=" + delay + "ms"); + sessions.remove(sessionId); + sessionToCleanup = session2; + } + } + + // call clean up outside of lock + if (sessionToCleanup != null) + sessionToCleanup.cleanup(); + } + }; + + SimpleTimer.getInstance(aconf).schedule(r, delay); + } + } + + public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { + Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>(); ++ Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); ++ ++ synchronized (idleSessions) { ++ /** ++ * Add sessions so that get the list returned in the active scans call ++ */ ++ for (Session session : idleSessions) { ++ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); ++ } ++ } ++ + for (Entry<Long,Session> entry : sessions.entrySet()) { + + Session session = entry.getValue(); + @SuppressWarnings("rawtypes") + ScanTask nbt = null; + String tableID = null; + + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + nbt = ss.nextBatchTask; + tableID = ss.extent.getTableId().toString(); + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + nbt = mss.lookupTask; + tableID = mss.threadPoolExtent.getTableId().toString(); + } + + if (nbt == null) + continue; + + ScanRunState srs = nbt.getScanRunState(); + + if (srs == ScanRunState.FINISHED) + continue; + + MapCounter<ScanRunState> stateCounts = counts.get(tableID); + if (stateCounts == null) { + stateCounts = new MapCounter<ScanRunState>(); + counts.put(tableID, stateCounts); + } + + stateCounts.increment(srs, 1); + } + + return counts; + } + + public synchronized List<ActiveScan> getActiveScans() { + - List<ActiveScan> activeScans = new ArrayList<ActiveScan>(); ++ final List<ActiveScan> activeScans = new ArrayList<ActiveScan>(); ++ final long ct = System.currentTimeMillis(); ++ final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); + - long ct = System.currentTimeMillis(); ++ synchronized (idleSessions) { ++ /** ++ * Add sessions so that get the list returned in the active scans call ++ */ ++ for (Session session : idleSessions) { ++ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); ++ } ++ } + - for (Entry<Long,Session> entry : sessions.entrySet()) { ++ for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { + Session session = entry.getValue(); + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask<ScanBatch> nbt = ss.nextBatchTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, + ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, + ss.auths.getAuthorizationsBB()); + + // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor + activeScan.setScanId(entry.getKey()); + activeScans.add(activeScan); + + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask<MultiScanResult> nbt = mss.lookupTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, + ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths + .getAuthorizationsBB())); + } + } + + return activeScans; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 790a352,0000000..c96c75a mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@@ -1,137 -1,0 +1,167 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; ++import java.util.concurrent.Semaphore; ++import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Scanner { + private static final Logger log = LoggerFactory.getLogger(Scanner.class); + + private final Tablet tablet; + private final ScanOptions options; + private Range range; + private SortedKeyValueIterator<Key,Value> isolatedIter; + private ScanDataSource isolatedDataSource; + private boolean sawException = false; + private boolean scanClosed = false; ++ /** ++ * A fair semaphore of one is used since explicitly know the access pattern will be one thread to read and another to call close if the session becomes idle. ++ * Since we're explicitly preventing re-entrance, we're currently using a Sempahore. If at any point we decide read needs to be re-entrant, we can switch to a ++ * Reentrant lock. ++ */ ++ private Semaphore scannerSemaphore; + + Scanner(Tablet tablet, Range range, ScanOptions options) { + this.tablet = tablet; + this.range = range; + this.options = options; ++ this.scannerSemaphore = new Semaphore(1, true); + } + - public synchronized ScanBatch read() throws IOException, TabletClosedException { ++ public ScanBatch read() throws IOException, TabletClosedException { + - if (sawException) - throw new IllegalStateException("Tried to use scanner after exception occurred."); - - if (scanClosed) - throw new IllegalStateException("Tried to use scanner after it was closed."); ++ ScanDataSource dataSource = null; + + Batch results = null; + - ScanDataSource dataSource; ++ try { + - if (options.isIsolated()) { - if (isolatedDataSource == null) - isolatedDataSource = new ScanDataSource(tablet, options); - dataSource = isolatedDataSource; - } else { - dataSource = new ScanDataSource(tablet, options); - } ++ try { ++ scannerSemaphore.acquire(); ++ } catch (InterruptedException e) { ++ sawException = true; ++ } + - try { ++ // sawException may have occurred within close, so we cannot assume that an interrupted exception was its cause ++ if (sawException) ++ throw new IllegalStateException("Tried to use scanner after exception occurred."); ++ ++ if (scanClosed) ++ throw new IllegalStateException("Tried to use scanner after it was closed."); ++ ++ if (options.isIsolated()) { ++ if (isolatedDataSource == null) ++ isolatedDataSource = new ScanDataSource(tablet, options); ++ dataSource = isolatedDataSource; ++ } else { ++ dataSource = new ScanDataSource(tablet, options); ++ } + + SortedKeyValueIterator<Key,Value> iter; + + if (options.isIsolated()) { + if (isolatedIter == null) + isolatedIter = new SourceSwitchingIterator(dataSource, true); + else + isolatedDataSource.reattachFileManager(); + iter = isolatedIter; + } else { + iter = new SourceSwitchingIterator(dataSource, false); + } + + results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet()); + + if (results.getResults() == null) { + range = null; + return new ScanBatch(new ArrayList<KVEntry>(), false); + } else if (results.getContinueKey() == null) { + return new ScanBatch(results.getResults(), false); + } else { + range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(), range.isEndKeyInclusive()); + return new ScanBatch(results.getResults(), true); + } + + } catch (IterationInterruptedException iie) { + sawException = true; + if (tablet.isClosed()) + throw new TabletClosedException(iie); + else + throw iie; + } catch (IOException ioe) { + if (tablet.shutdownInProgress()) { + log.debug("IOException while shutdown in progress ", ioe); + throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook + } + + sawException = true; + dataSource.close(true); + throw ioe; + } catch (RuntimeException re) { + sawException = true; + throw re; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown - if (!options.isIsolated()) { ++ if (null != dataSource && !options.isIsolated()) { + dataSource.close(false); - } else { ++ } else if (null != dataSource) { + dataSource.detachFileManager(); + } + + if (results != null && results.getResults() != null) + tablet.updateQueryStats(results.getResults().size(), results.getNumBytes()); ++ ++ scannerSemaphore.release(); + } + } + + // close and read are synchronized because can not call close on the data source while it is in use + // this could lead to the case where file iterators that are in use by a thread are returned + // to the pool... this would be bad - public void close() { ++ public boolean close() { + options.getInterruptFlag().set(true); - synchronized (this) { ++ ++ boolean obtainedLock = false; ++ try { ++ obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS); ++ if (!obtainedLock) ++ return false; ++ + scanClosed = true; + if (isolatedDataSource != null) + isolatedDataSource.close(false); ++ } catch (InterruptedException e) { ++ return false; ++ } finally { ++ if (obtainedLock) ++ scannerSemaphore.release(); + } ++ return true; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java index daf781f,6009462..cb5bc18 --- a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java @@@ -49,9 -49,7 +49,9 @@@ public class ScanSessionTimeOutIT exten @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString())); + Map<String,String> siteConfig = cfg.getSiteConfig(); - siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "3"); ++ siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString()); + cfg.setSiteConfig(siteConfig); } @Override