Merge branch '1.6' into 1.7

Conflicts:
        server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
        
test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/65628282
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/65628282
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/65628282

Branch: refs/heads/1.7
Commit: 656282825ad0eb4ee51052e71492a3d3fd5c1f02
Parents: 642add8 46ad836
Author: Josh Elser <els...@apache.org>
Authored: Tue Jan 12 00:24:24 2016 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Tue Jan 12 00:24:24 2016 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   6 +-
 .../tserver/session/ConditionalSession.java     |   3 +-
 .../tserver/session/MultiScanSession.java       |   4 +-
 .../accumulo/tserver/session/ScanSession.java   |   6 +-
 .../accumulo/tserver/session/Session.java       |   4 +-
 .../tserver/session/SessionManager.java         |  61 ++++++-
 .../apache/accumulo/tserver/tablet/Scanner.java |  68 +++++--
 .../test/functional/ScanSessionTimeOutIT.java   |  15 +-
 .../test/functional/SessionBlockVerifyIT.java   | 176 +++++++++++++++++++
 9 files changed, 306 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
index cd5e617,0000000..138f558
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
@@@ -1,44 -1,0 +1,45 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.client.Durability;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +
 +public class ConditionalSession extends Session {
 +  public final TCredentials credentials;
 +  public final Authorizations auths;
 +  public final String tableId;
 +  public final AtomicBoolean interruptFlag = new AtomicBoolean();
 +  public final Durability durability;
 +
 +  public ConditionalSession(TCredentials credentials, Authorizations 
authorizations, String tableId, Durability durability) {
 +    super(credentials);
 +    this.credentials = credentials;
 +    this.auths = authorizations;
 +    this.tableId = tableId;
 +    this.durability = durability;
 +  }
 +
 +  @Override
-   public void cleanup() {
++  public boolean cleanup() {
 +    interruptFlag.set(true);
++    return true;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index b326e10,0000000..2fd590c
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@@ -1,63 -1,0 +1,65 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.tserver.scan.ScanTask;
 +
 +public class MultiScanSession extends Session {
 +  public final KeyExtent threadPoolExtent;
 +  public final HashSet<Column> columnSet = new HashSet<Column>();
 +  public final Map<KeyExtent,List<Range>> queries;
 +  public final List<IterInfo> ssiList;
 +  public final Map<String,Map<String,String>> ssio;
 +  public final Authorizations auths;
 +
 +  // stats
 +  public int numRanges;
 +  public int numTablets;
 +  public int numEntries;
 +  public long totalLookupTime;
 +
 +  public volatile ScanTask<MultiScanResult> lookupTask;
 +
 +  public MultiScanSession(TCredentials credentials, KeyExtent 
threadPoolExtent, Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
 +      Map<String,Map<String,String>> ssio, Authorizations authorizations) {
 +    super(credentials);
 +    this.queries = queries;
 +    this.ssiList = ssiList;
 +    this.ssio = ssio;
 +    this.auths = authorizations;
 +    this.threadPoolExtent = threadPoolExtent;
 +  }
 +
 +  @Override
-   public void cleanup() {
++  public boolean cleanup() {
 +    if (lookupTask != null)
 +      lookupTask.cancel(true);
++    // the cancellation should provide us the safety to return true here
++    return true;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index d5b0027,0000000..743f4d3
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@@ -1,70 -1,0 +1,72 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.data.thrift.IterInfo;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.Stat;
 +import org.apache.accumulo.tserver.scan.ScanTask;
 +import org.apache.accumulo.tserver.tablet.ScanBatch;
 +import org.apache.accumulo.tserver.tablet.Scanner;
 +
 +public class ScanSession extends Session {
 +  public final Stat nbTimes = new Stat();
 +  public final KeyExtent extent;
 +  public final Set<Column> columnSet;
 +  public final List<IterInfo> ssiList;
 +  public final Map<String,Map<String,String>> ssio;
 +  public final Authorizations auths;
 +  public final AtomicBoolean interruptFlag = new AtomicBoolean();
 +  public long entriesReturned = 0;
 +  public long batchCount = 0;
 +  public volatile ScanTask<ScanBatch> nextBatchTask;
 +  public Scanner scanner;
 +  public final long readaheadThreshold;
 +
 +  public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> 
columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
 +      Authorizations authorizations, long readaheadThreshold) {
 +    super(credentials);
 +    this.extent = extent;
 +    this.columnSet = columnSet;
 +    this.ssiList = ssiList;
 +    this.ssio = ssio;
 +    this.auths = authorizations;
 +    this.readaheadThreshold = readaheadThreshold;
 +  }
 +
 +  @Override
-   public void cleanup() {
++  public boolean cleanup() {
 +    try {
 +      if (nextBatchTask != null)
 +        nextBatchTask.cancel(true);
 +    } finally {
 +      if (scanner != null)
-         scanner.close();
++        return scanner.close();
++      else
++        return true;
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index a561166,0000000..1d2d88d
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@@ -1,43 -1,0 +1,45 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.server.rpc.TServerUtils;
 +
 +public class Session {
 +  public final String client;
 +  long lastAccessTime;
 +  public long startTime;
 +  boolean reserved;
 +  private final TCredentials credentials;
 +
 +  Session(TCredentials credentials) {
 +    this.credentials = credentials;
 +    this.client = TServerUtils.clientAddress.get();
 +  }
 +
 +  public String getUser() {
 +    return credentials.getPrincipal();
 +  }
 +
 +  public TCredentials getCredentials() {
 +    return credentials;
 +  }
 +
-   public void cleanup() {}
++  public boolean cleanup() {
++    return true;
++  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 83d87b3,0000000..1cd7fa6
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -1,319 -1,0 +1,362 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import java.security.SecureRandom;
 +import java.util.ArrayList;
 +import java.util.HashMap;
++import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
++import java.util.Set;
 +import java.util.TimerTask;
 +
 +import org.apache.accumulo.core.client.impl.Translator;
 +import org.apache.accumulo.core.client.impl.Translators;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.tserver.scan.ScanRunState;
 +import org.apache.accumulo.tserver.scan.ScanTask;
 +import org.apache.accumulo.tserver.tablet.ScanBatch;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import com.google.common.collect.Iterables;
++import com.google.common.collect.Maps;
++
 +public class SessionManager {
 +  private static final Logger log = 
LoggerFactory.getLogger(SessionManager.class);
 +
 +  private final SecureRandom random = new SecureRandom();
 +  private final Map<Long,Session> sessions = new HashMap<Long,Session>();
 +  private final long maxIdle;
++  private final long maxUpdateIdle;
++  private final List<Session> idleSessions = new ArrayList<Session>();
++  private final Long expiredSessionMarker = new Long(-1);
 +  private final AccumuloConfiguration aconf;
 +
 +  public SessionManager(AccumuloConfiguration conf) {
 +    aconf = conf;
++    maxUpdateIdle = 
conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
 +    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 +
 +    Runnable r = new Runnable() {
 +      @Override
 +      public void run() {
-         sweep(maxIdle);
++        sweep(maxIdle, maxUpdateIdle);
 +      }
 +    };
 +
 +    SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
 +  }
 +
 +  public synchronized long createSession(Session session, boolean reserve) {
 +    long sid = random.nextLong();
 +
 +    while (sessions.containsKey(sid)) {
 +      sid = random.nextLong();
 +    }
 +
 +    sessions.put(sid, session);
 +
 +    session.reserved = reserve;
 +
 +    session.startTime = session.lastAccessTime = System.currentTimeMillis();
 +
 +    return sid;
 +  }
 +
 +  public long getMaxIdleTime() {
 +    return maxIdle;
 +  }
 +
 +  /**
 +   * while a session is reserved, it cannot be canceled or removed
 +   */
 +
 +  public synchronized Session reserveSession(long sessionId) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      if (session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = true;
 +    }
 +
 +    return session;
 +
 +  }
 +
 +  public synchronized Session reserveSession(long sessionId, boolean wait) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      while (wait && session.reserved) {
 +        try {
 +          wait(1000);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException();
 +        }
 +      }
 +
 +      if (session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = true;
 +    }
 +
 +    return session;
 +
 +  }
 +
 +  public synchronized void unreserveSession(Session session) {
 +    if (!session.reserved)
 +      throw new IllegalStateException();
 +    notifyAll();
 +    session.reserved = false;
 +    session.lastAccessTime = System.currentTimeMillis();
 +  }
 +
 +  public synchronized void unreserveSession(long sessionId) {
 +    Session session = getSession(sessionId);
 +    if (session != null)
 +      unreserveSession(session);
 +  }
 +
 +  public synchronized Session getSession(long sessionId) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null)
 +      session.lastAccessTime = System.currentTimeMillis();
 +    return session;
 +  }
 +
 +  public Session removeSession(long sessionId) {
 +    return removeSession(sessionId, false);
 +  }
 +
 +  public Session removeSession(long sessionId, boolean unreserve) {
 +    Session session = null;
 +    synchronized (this) {
 +      session = sessions.remove(sessionId);
 +      if (unreserve && session != null)
 +        unreserveSession(session);
 +    }
 +
 +    // do clean up out side of lock..
 +    if (session != null)
 +      session.cleanup();
 +
 +    return session;
 +  }
 +
-   private void sweep(long maxIdle) {
++  private void sweep(final long maxIdle, final long maxUpdateIdle) {
 +    List<Session> sessionsToCleanup = new ArrayList<Session>();
 +    synchronized (this) {
 +      Iterator<Session> iter = sessions.values().iterator();
 +      while (iter.hasNext()) {
 +        Session session = iter.next();
++        long configuredIdle = maxIdle;
++        if (session instanceof UpdateSession) {
++          configuredIdle = maxUpdateIdle;
++        }
 +        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-         if (idleTime > maxIdle && !session.reserved) {
++        if (idleTime > configuredIdle && !session.reserved) {
 +          log.info("Closing idle session from user=" + session.getUser() + ", 
client=" + session.client + ", idle=" + idleTime + "ms");
 +          iter.remove();
 +          sessionsToCleanup.add(session);
 +        }
 +      }
 +    }
 +
-     // do clean up outside of lock
-     for (Session session : sessionsToCleanup) {
-       session.cleanup();
++    // do clean up outside of lock for TabletServer in a synchronized block 
for simplicity vice a synchronized list
++
++    synchronized (idleSessions) {
++
++      sessionsToCleanup.addAll(idleSessions);
++
++      idleSessions.clear();
++
++      // perform cleanup for all of the sessions
++      for (Session session : sessionsToCleanup) {
++        if (!session.cleanup())
++          idleSessions.add(session);
++      }
 +    }
 +  }
 +
 +  public synchronized void removeIfNotAccessed(final long sessionId, final 
long delay) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      final long removeTime = session.lastAccessTime;
 +      TimerTask r = new TimerTask() {
 +        @Override
 +        public void run() {
 +          Session sessionToCleanup = null;
 +          synchronized (SessionManager.this) {
 +            Session session2 = sessions.get(sessionId);
 +            if (session2 != null && session2.lastAccessTime == removeTime && 
!session2.reserved) {
 +              log.info("Closing not accessed session from user=" + 
session2.getUser() + ", client=" + session2.client + ", duration=" + delay + 
"ms");
 +              sessions.remove(sessionId);
 +              sessionToCleanup = session2;
 +            }
 +          }
 +
 +          // call clean up outside of lock
 +          if (sessionToCleanup != null)
 +            sessionToCleanup.cleanup();
 +        }
 +      };
 +
 +      SimpleTimer.getInstance(aconf).schedule(r, delay);
 +    }
 +  }
 +
 +  public synchronized Map<String,MapCounter<ScanRunState>> 
getActiveScansPerTable() {
 +    Map<String,MapCounter<ScanRunState>> counts = new 
HashMap<String,MapCounter<ScanRunState>>();
++    Set<Entry<Long,Session>> copiedIdleSessions = new 
HashSet<Entry<Long,Session>>();
++
++    synchronized (idleSessions) {
++      /**
++       * Add sessions so that get the list returned in the active scans call
++       */
++      for (Session session : idleSessions) {
++        copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
++      }
++    }
++
 +    for (Entry<Long,Session> entry : sessions.entrySet()) {
 +
 +      Session session = entry.getValue();
 +      @SuppressWarnings("rawtypes")
 +      ScanTask nbt = null;
 +      String tableID = null;
 +
 +      if (session instanceof ScanSession) {
 +        ScanSession ss = (ScanSession) session;
 +        nbt = ss.nextBatchTask;
 +        tableID = ss.extent.getTableId().toString();
 +      } else if (session instanceof MultiScanSession) {
 +        MultiScanSession mss = (MultiScanSession) session;
 +        nbt = mss.lookupTask;
 +        tableID = mss.threadPoolExtent.getTableId().toString();
 +      }
 +
 +      if (nbt == null)
 +        continue;
 +
 +      ScanRunState srs = nbt.getScanRunState();
 +
 +      if (srs == ScanRunState.FINISHED)
 +        continue;
 +
 +      MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 +      if (stateCounts == null) {
 +        stateCounts = new MapCounter<ScanRunState>();
 +        counts.put(tableID, stateCounts);
 +      }
 +
 +      stateCounts.increment(srs, 1);
 +    }
 +
 +    return counts;
 +  }
 +
 +  public synchronized List<ActiveScan> getActiveScans() {
 +
-     List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
++    final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
++    final long ct = System.currentTimeMillis();
++    final Set<Entry<Long,Session>> copiedIdleSessions = new 
HashSet<Entry<Long,Session>>();
 +
-     long ct = System.currentTimeMillis();
++    synchronized (idleSessions) {
++      /**
++       * Add sessions so that get the list returned in the active scans call
++       */
++      for (Session session : idleSessions) {
++        copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
++      }
++    }
 +
-     for (Entry<Long,Session> entry : sessions.entrySet()) {
++    for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), 
copiedIdleSessions)) {
 +      Session session = entry.getValue();
 +      if (session instanceof ScanSession) {
 +        ScanSession ss = (ScanSession) session;
 +
 +        ScanState state = ScanState.RUNNING;
 +
 +        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 +        if (nbt == null) {
 +          state = ScanState.IDLE;
 +        } else {
 +          switch (nbt.getScanRunState()) {
 +            case QUEUED:
 +              state = ScanState.QUEUED;
 +              break;
 +            case FINISHED:
 +              state = ScanState.IDLE;
 +              break;
 +            case RUNNING:
 +            default:
 +              /* do nothing */
 +              break;
 +          }
 +        }
 +
 +        ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), 
ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime,
 +            ScanType.SINGLE, state, ss.extent.toThrift(), 
Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio,
 +            ss.auths.getAuthorizationsBB());
 +
 +        // scanId added by ACCUMULO-2641 is an optional thrift argument and 
not available in ActiveScan constructor
 +        activeScan.setScanId(entry.getKey());
 +        activeScans.add(activeScan);
 +
 +      } else if (session instanceof MultiScanSession) {
 +        MultiScanSession mss = (MultiScanSession) session;
 +
 +        ScanState state = ScanState.RUNNING;
 +
 +        ScanTask<MultiScanResult> nbt = mss.lookupTask;
 +        if (nbt == null) {
 +          state = ScanState.IDLE;
 +        } else {
 +          switch (nbt.getScanRunState()) {
 +            case QUEUED:
 +              state = ScanState.QUEUED;
 +              break;
 +            case FINISHED:
 +              state = ScanState.IDLE;
 +              break;
 +            case RUNNING:
 +            default:
 +              /* do nothing */
 +              break;
 +          }
 +        }
 +
 +        activeScans.add(new ActiveScan(mss.client, mss.getUser(), 
mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - 
mss.lastAccessTime,
 +            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), 
Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, 
mss.auths
 +                .getAuthorizationsBB()));
 +      }
 +    }
 +
 +    return activeScans;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 790a352,0000000..c96c75a
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@@ -1,137 -1,0 +1,167 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
++import java.util.concurrent.Semaphore;
++import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IterationInterruptedException;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class Scanner {
 +  private static final Logger log = LoggerFactory.getLogger(Scanner.class);
 +
 +  private final Tablet tablet;
 +  private final ScanOptions options;
 +  private Range range;
 +  private SortedKeyValueIterator<Key,Value> isolatedIter;
 +  private ScanDataSource isolatedDataSource;
 +  private boolean sawException = false;
 +  private boolean scanClosed = false;
++  /**
++   * A fair semaphore of one is used since explicitly know the access pattern 
will be one thread to read and another to call close if the session becomes 
idle.
++   * Since we're explicitly preventing re-entrance, we're currently using a 
Sempahore. If at any point we decide read needs to be re-entrant, we can switch 
to a
++   * Reentrant lock.
++   */
++  private Semaphore scannerSemaphore;
 +
 +  Scanner(Tablet tablet, Range range, ScanOptions options) {
 +    this.tablet = tablet;
 +    this.range = range;
 +    this.options = options;
++    this.scannerSemaphore = new Semaphore(1, true);
 +  }
 +
-   public synchronized ScanBatch read() throws IOException, 
TabletClosedException {
++  public ScanBatch read() throws IOException, TabletClosedException {
 +
-     if (sawException)
-       throw new IllegalStateException("Tried to use scanner after exception 
occurred.");
- 
-     if (scanClosed)
-       throw new IllegalStateException("Tried to use scanner after it was 
closed.");
++    ScanDataSource dataSource = null;
 +
 +    Batch results = null;
 +
-     ScanDataSource dataSource;
++    try {
 +
-     if (options.isIsolated()) {
-       if (isolatedDataSource == null)
-         isolatedDataSource = new ScanDataSource(tablet, options);
-       dataSource = isolatedDataSource;
-     } else {
-       dataSource = new ScanDataSource(tablet, options);
-     }
++      try {
++        scannerSemaphore.acquire();
++      } catch (InterruptedException e) {
++        sawException = true;
++      }
 +
-     try {
++      // sawException may have occurred within close, so we cannot assume 
that an interrupted exception was its cause
++      if (sawException)
++        throw new IllegalStateException("Tried to use scanner after exception 
occurred.");
++
++      if (scanClosed)
++        throw new IllegalStateException("Tried to use scanner after it was 
closed.");
++
++      if (options.isIsolated()) {
++        if (isolatedDataSource == null)
++          isolatedDataSource = new ScanDataSource(tablet, options);
++        dataSource = isolatedDataSource;
++      } else {
++        dataSource = new ScanDataSource(tablet, options);
++      }
 +
 +      SortedKeyValueIterator<Key,Value> iter;
 +
 +      if (options.isIsolated()) {
 +        if (isolatedIter == null)
 +          isolatedIter = new SourceSwitchingIterator(dataSource, true);
 +        else
 +          isolatedDataSource.reattachFileManager();
 +        iter = isolatedIter;
 +      } else {
 +        iter = new SourceSwitchingIterator(dataSource, false);
 +      }
 +
 +      results = tablet.nextBatch(iter, range, options.getNum(), 
options.getColumnSet());
 +
 +      if (results.getResults() == null) {
 +        range = null;
 +        return new ScanBatch(new ArrayList<KVEntry>(), false);
 +      } else if (results.getContinueKey() == null) {
 +        return new ScanBatch(results.getResults(), false);
 +      } else {
 +        range = new Range(results.getContinueKey(), 
!results.isSkipContinueKey(), range.getEndKey(), range.isEndKeyInclusive());
 +        return new ScanBatch(results.getResults(), true);
 +      }
 +
 +    } catch (IterationInterruptedException iie) {
 +      sawException = true;
 +      if (tablet.isClosed())
 +        throw new TabletClosedException(iie);
 +      else
 +        throw iie;
 +    } catch (IOException ioe) {
 +      if (tablet.shutdownInProgress()) {
 +        log.debug("IOException while shutdown in progress ", ioe);
 +        throw new TabletClosedException(ioe); // assume IOException was 
caused by execution of HDFS shutdown hook
 +      }
 +
 +      sawException = true;
 +      dataSource.close(true);
 +      throw ioe;
 +    } catch (RuntimeException re) {
 +      sawException = true;
 +      throw re;
 +    } finally {
 +      // code in finally block because always want
 +      // to return mapfiles, even when exception is thrown
-       if (!options.isIsolated()) {
++      if (null != dataSource && !options.isIsolated()) {
 +        dataSource.close(false);
-       } else {
++      } else if (null != dataSource) {
 +        dataSource.detachFileManager();
 +      }
 +
 +      if (results != null && results.getResults() != null)
 +        tablet.updateQueryStats(results.getResults().size(), 
results.getNumBytes());
++
++      scannerSemaphore.release();
 +    }
 +  }
 +
 +  // close and read are synchronized because can not call close on the data 
source while it is in use
 +  // this could lead to the case where file iterators that are in use by a 
thread are returned
 +  // to the pool... this would be bad
-   public void close() {
++  public boolean close() {
 +    options.getInterruptFlag().set(true);
-     synchronized (this) {
++
++    boolean obtainedLock = false;
++    try {
++      obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
++      if (!obtainedLock)
++        return false;
++
 +      scanClosed = true;
 +      if (isolatedDataSource != null)
 +        isolatedDataSource.close(false);
++    } catch (InterruptedException e) {
++      return false;
++    } finally {
++      if (obtainedLock)
++        scannerSemaphore.release();
 +    }
++    return true;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/65628282/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
----------------------------------------------------------------------
diff --cc 
test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index daf781f,6009462..cb5bc18
--- 
a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ 
b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@@ -49,9 -49,7 +49,9 @@@ public class ScanSessionTimeOutIT exten
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
 -    
cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(),
 getMaxIdleTimeString()));
 +    Map<String,String> siteConfig = cfg.getSiteConfig();
-     siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "3");
++    siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), 
getMaxIdleTimeString());
 +    cfg.setSiteConfig(siteConfig);
    }
  
    @Override

Reply via email to