Merge branch '1.6'

Conflicts:
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4062b343
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4062b343
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4062b343

Branch: refs/heads/master
Commit: 4062b34369f00031f80feb33f34b8f5b91e5abbf
Parents: e7d0397 d470f05
Author: Josh Elser <els...@apache.org>
Authored: Thu Nov 20 13:18:38 2014 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Thu Nov 20 13:42:37 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/tserver/session/SessionManager.java  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4062b343/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 571d85c,0000000..c9445c6
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@@ -1,309 -1,0 +1,313 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.session;
 +
 +import java.security.SecureRandom;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.TimerTask;
 +
 +import org.apache.accumulo.core.client.impl.Translator;
 +import org.apache.accumulo.core.client.impl.Translators;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 +import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 +import org.apache.accumulo.core.util.MapCounter;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.tserver.scan.ScanRunState;
 +import org.apache.accumulo.tserver.scan.ScanTask;
 +import org.apache.accumulo.tserver.tablet.ScanBatch;
++import org.apache.log4j.Logger;
 +
 +public class SessionManager {
++  private static final Logger log = Logger.getLogger(SessionManager.class);
 +
 +  private final SecureRandom random = new SecureRandom();
 +  private final Map<Long,Session> sessions = new HashMap<Long,Session>();
 +  private final long maxIdle;
 +  private final AccumuloConfiguration aconf;
 +
 +  public SessionManager(AccumuloConfiguration conf) {
 +    aconf = conf;
 +    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 +
 +    Runnable r = new Runnable() {
 +      @Override
 +      public void run() {
 +        sweep(maxIdle);
 +      }
 +    };
 +
 +    SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
 +  }
 +
 +  public synchronized long createSession(Session session, boolean reserve) {
 +    long sid = random.nextLong();
 +
 +    while (sessions.containsKey(sid)) {
 +      sid = random.nextLong();
 +    }
 +
 +    sessions.put(sid, session);
 +
 +    session.reserved = reserve;
 +
 +    session.startTime = session.lastAccessTime = System.currentTimeMillis();
 +
 +    return sid;
 +  }
 +
 +  public long getMaxIdleTime() {
 +    return maxIdle;
 +  }
 +
 +  /**
 +   * while a session is reserved, it cannot be canceled or removed
 +   */
 +
 +  public synchronized Session reserveSession(long sessionId) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      if (session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = true;
 +    }
 +
 +    return session;
 +
 +  }
 +
 +  public synchronized Session reserveSession(long sessionId, boolean wait) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      while (wait && session.reserved) {
 +        try {
 +          wait(1000);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException();
 +        }
 +      }
 +
 +      if (session.reserved)
 +        throw new IllegalStateException();
 +      session.reserved = true;
 +    }
 +
 +    return session;
 +
 +  }
 +
 +  public synchronized void unreserveSession(Session session) {
 +    if (!session.reserved)
 +      throw new IllegalStateException();
 +    notifyAll();
 +    session.reserved = false;
 +    session.lastAccessTime = System.currentTimeMillis();
 +  }
 +
 +  public synchronized void unreserveSession(long sessionId) {
 +    Session session = getSession(sessionId);
 +    if (session != null)
 +      unreserveSession(session);
 +  }
 +
 +  public synchronized Session getSession(long sessionId) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null)
 +      session.lastAccessTime = System.currentTimeMillis();
 +    return session;
 +  }
 +
 +  public Session removeSession(long sessionId) {
 +    return removeSession(sessionId, false);
 +  }
 +
 +  public Session removeSession(long sessionId, boolean unreserve) {
 +    Session session = null;
 +    synchronized (this) {
 +      session = sessions.remove(sessionId);
 +      if (unreserve && session != null)
 +        unreserveSession(session);
 +    }
 +
 +    // do clean up out side of lock..
 +    if (session != null)
 +      session.cleanup();
 +
 +    return session;
 +  }
 +
 +  private void sweep(long maxIdle) {
 +    List<Session> sessionsToCleanup = new ArrayList<Session>();
 +    synchronized (this) {
 +      Iterator<Session> iter = sessions.values().iterator();
 +      while (iter.hasNext()) {
 +        Session session = iter.next();
 +        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 +        if (idleTime > maxIdle && !session.reserved) {
++          log.info("Closing idle session from user=" + session.getUser() + ", 
client=" + session.client + ", idle=" + idleTime + "ms");
 +          iter.remove();
 +          sessionsToCleanup.add(session);
 +        }
 +      }
 +    }
 +
 +    // do clean up outside of lock
 +    for (Session session : sessionsToCleanup) {
 +      session.cleanup();
 +    }
 +  }
 +
-   public synchronized void removeIfNotAccessed(final long sessionId, long 
delay) {
++  public synchronized void removeIfNotAccessed(final long sessionId, final 
long delay) {
 +    Session session = sessions.get(sessionId);
 +    if (session != null) {
 +      final long removeTime = session.lastAccessTime;
 +      TimerTask r = new TimerTask() {
 +        @Override
 +        public void run() {
 +          Session sessionToCleanup = null;
 +          synchronized (SessionManager.this) {
 +            Session session2 = sessions.get(sessionId);
 +            if (session2 != null && session2.lastAccessTime == removeTime && 
!session2.reserved) {
++              log.info("Closing not accessed session from user=" + 
session2.getUser() + ", client=" + session2.client + ", duration=" + delay + 
"ms");
 +              sessions.remove(sessionId);
 +              sessionToCleanup = session2;
 +            }
 +          }
 +
 +          // call clean up outside of lock
 +          if (sessionToCleanup != null)
 +            sessionToCleanup.cleanup();
 +        }
 +      };
 +
 +      SimpleTimer.getInstance(aconf).schedule(r, delay);
 +    }
 +  }
 +
 +  public synchronized Map<String,MapCounter<ScanRunState>> 
getActiveScansPerTable() {
 +    Map<String,MapCounter<ScanRunState>> counts = new 
HashMap<String,MapCounter<ScanRunState>>();
 +    for (Entry<Long,Session> entry : sessions.entrySet()) {
 +
 +      Session session = entry.getValue();
 +      @SuppressWarnings("rawtypes")
 +      ScanTask nbt = null;
 +      String tableID = null;
 +
 +      if (session instanceof ScanSession) {
 +        ScanSession ss = (ScanSession) session;
 +        nbt = ss.nextBatchTask;
 +        tableID = ss.extent.getTableId().toString();
 +      } else if (session instanceof MultiScanSession) {
 +        MultiScanSession mss = (MultiScanSession) session;
 +        nbt = mss.lookupTask;
 +        tableID = mss.threadPoolExtent.getTableId().toString();
 +      }
 +
 +      if (nbt == null)
 +        continue;
 +
 +      ScanRunState srs = nbt.getScanRunState();
 +
 +      if (srs == ScanRunState.FINISHED)
 +        continue;
 +
 +      MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 +      if (stateCounts == null) {
 +        stateCounts = new MapCounter<ScanRunState>();
 +        counts.put(tableID, stateCounts);
 +      }
 +
 +      stateCounts.increment(srs, 1);
 +    }
 +
 +    return counts;
 +  }
 +
 +  public synchronized List<ActiveScan> getActiveScans() {
 +
 +    List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 +
 +    long ct = System.currentTimeMillis();
 +
 +    for (Entry<Long,Session> entry : sessions.entrySet()) {
 +      Session session = entry.getValue();
 +      if (session instanceof ScanSession) {
 +        ScanSession ss = (ScanSession) session;
 +
 +        ScanState state = ScanState.RUNNING;
 +
 +        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 +        if (nbt == null) {
 +          state = ScanState.IDLE;
 +        } else {
 +          switch (nbt.getScanRunState()) {
 +            case QUEUED:
 +              state = ScanState.QUEUED;
 +              break;
 +            case FINISHED:
 +              state = ScanState.IDLE;
 +              break;
 +            case RUNNING:
 +            default:
 +              /* do nothing */
 +              break;
 +          }
 +        }
 +
 +        activeScans.add(new ActiveScan(ss.client, ss.getUser(), 
ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, 
ScanType.SINGLE,
 +            state, ss.extent.toThrift(), Translator.translate(ss.columnSet, 
Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
 +
 +      } else if (session instanceof MultiScanSession) {
 +        MultiScanSession mss = (MultiScanSession) session;
 +
 +        ScanState state = ScanState.RUNNING;
 +
 +        ScanTask<MultiScanResult> nbt = mss.lookupTask;
 +        if (nbt == null) {
 +          state = ScanState.IDLE;
 +        } else {
 +          switch (nbt.getScanRunState()) {
 +            case QUEUED:
 +              state = ScanState.QUEUED;
 +              break;
 +            case FINISHED:
 +              state = ScanState.IDLE;
 +              break;
 +            case RUNNING:
 +            default:
 +              /* do nothing */
 +              break;
 +          }
 +        }
 +
 +        activeScans.add(new ActiveScan(mss.client, mss.getUser(), 
mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - 
mss.lastAccessTime,
 +            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), 
Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, 
mss.auths
 +                .getAuthorizationsBB()));
 +      }
 +    }
 +
 +    return activeScans;
 +  }
- }
++}

Reply via email to