Repository: incubator-ignite Updated Branches: refs/heads/ignite-gg-10561 482cfbb8c -> 3bfbb9bf6
# gg-10561: session expiration Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3bfbb9bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3bfbb9bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3bfbb9bf Branch: refs/heads/ignite-gg-10561 Commit: 3bfbb9bf6bbe22b5b37b7ad42adcb32a4c8a35a1 Parents: 482cfbb Author: ashutak <ashu...@gridgain.com> Authored: Wed Jul 22 22:10:54 2015 +0300 Committer: ashutak <ashu...@gridgain.com> Committed: Wed Jul 22 22:10:54 2015 +0300 ---------------------------------------------------------------------- .../processors/rest/GridRestProcessor.java | 146 ++++++++++++++++--- 1 file changed, 128 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3bfbb9bf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 60c7ef4..754bdb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -45,6 +45,7 @@ import org.jsr166.*; import java.lang.reflect.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import static org.apache.ignite.internal.processors.rest.GridRestResponse.*; import static org.apache.ignite.plugin.security.SecuritySubjectType.*; @@ -60,6 +61,12 @@ public class GridRestProcessor extends GridProcessorAdapter { /** */ public static final byte[] ZERO_BYTES = new byte[0]; + /** Delay between sessions expire checks*/ + private static final int SES_EXPIRE_CHECK_DELAY = 1_000; + + /** TODO */ + private static final int SES_EXPERATION_TIME = 30_000; + /** Protocols. */ private final Collection<GridRestProtocol> protos = new ArrayList<>(); @@ -76,10 +83,10 @@ public class GridRestProcessor extends GridProcessorAdapter { private final LongAdder8 workersCnt = new LongAdder8(); /** SecurityContext map. */ - private ConcurrentMap<UUID, Session> clientId2Session = new ConcurrentHashMap<>(); + private ConcurrentMap<UUID, Session> clientId2Ses = new ConcurrentHashMap<>(); /** SecurityContext map. */ - private ConcurrentMap<UUID, Session> sesTokId2Session = new ConcurrentHashMap<>(); + private ConcurrentMap<UUID, Session> sesTokId2Ses = new ConcurrentHashMap<>(); /** Protocol handler. */ private final GridRestProtocolHandler protoHnd = new GridRestProtocolHandler() { @@ -266,7 +273,7 @@ public class GridRestProcessor extends GridProcessorAdapter { * @return Not null session. * @throws IgniteCheckedException If failed. */ - private Session session(GridRestRequest req) throws IgniteCheckedException { + private Session session(final GridRestRequest req) throws IgniteCheckedException { final UUID clientId = req.clientId(); final byte[] sesTok = req.sessionToken(); @@ -275,51 +282,69 @@ public class GridRestProcessor extends GridProcessorAdapter { ses.clientId = UUID.randomUUID(); ses.sesTokId = UUID.randomUUID(); - clientId2Session.put(ses.clientId, ses); - sesTokId2Session.put(ses.sesTokId, ses); + clientId2Ses.put(ses.clientId, ses); + sesTokId2Ses.put(ses.sesTokId, ses); return ses; } if (F.isEmpty(sesTok) && clientId != null) { - Session ses = clientId2Session.get(clientId); + Session ses = clientId2Ses.get(clientId); if (ses == null) { /** First request with this clientId */ ses = new Session(); ses.clientId = clientId; ses.sesTokId = UUID.randomUUID(); - Session oldSes = clientId2Session.putIfAbsent(ses.clientId, ses); + Session curSes = clientId2Ses.putIfAbsent(ses.clientId, ses); - if (oldSes == null) - sesTokId2Session.put(ses.sesTokId, ses); + if (curSes == null) + sesTokId2Ses.put(ses.sesTokId, ses); + else { + boolean expired = curSes.checkExpirationAndTryUpdateLastTouchTime(); - return oldSes == null ? ses : oldSes; + // curSes != null means that there was at least 2 parallel request + // by the same clientId (and we was not first). + // So it was approximately at the same time and oldSession can't be expired. + assert !expired; + } + + return curSes == null ? ses : curSes; } + else { + if (ses.checkExpirationAndTryUpdateLastTouchTime()) + return session(req); - return ses; + return ses; + } } if (!F.isEmpty(sesTok) && clientId == null) { UUID sesTokId = U.bytesToUuid(sesTok, 0); - Session ses = sesTokId2Session.get(sesTokId); + Session ses = sesTokId2Ses.get(sesTokId); if (ses == null) - throw new IgniteCheckedException("Faile to handle request. Unknown session token " + + throw new IgniteCheckedException("Failed to handle request. Unknown session token " + "(maybe expired session). [sessionToken=" + U.byteArray2HexString(sesTok) + "]"); + if (ses.checkExpirationAndTryUpdateLastTouchTime()) + return session(req); + return ses; } if (!F.isEmpty(sesTok) && clientId != null) { - Session ses1 = sesTokId2Session.get(U.bytesToUuid(sesTok, 0)); - Session ses2 = clientId2Session.get(clientId); + Session ses1 = sesTokId2Ses.get(U.bytesToUuid(sesTok, 0)); + Session ses2 = clientId2Ses.get(clientId); if (ses1 == null || ses2 == null || !ses1.equals(ses2)) throw new IgniteCheckedException("Failed to handle request. " + "Unsupported case (use one: clientId or session token)"); + if (ses1.checkExpirationAndTryUpdateLastTouchTime()) + return session(req); + return ses1; } @@ -370,6 +395,39 @@ public class GridRestProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onKernalStart() throws IgniteCheckedException { + Thread sesExpirationThread = new Thread(new Runnable() { + @Override public void run() { + try { + while(!Thread.currentThread().isInterrupted()) { + Thread.sleep(SES_EXPIRE_CHECK_DELAY); + + for (Iterator<Map.Entry<UUID, Session>> iter = clientId2Ses.entrySet().iterator(); + iter.hasNext();) { + Map.Entry<UUID, Session> e = iter.next(); + + if (e.getValue().checkExpiration()) + iter.remove(); + } + + for (Iterator<Map.Entry<UUID, Session>> iter = sesTokId2Ses.entrySet().iterator(); + iter.hasNext();) { + Map.Entry<UUID, Session> e = iter.next(); + + if (e.getValue().isExpired()) + iter.remove(); + } + } + } + catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + }, "check-session-expired"); + + sesExpirationThread.setDaemon(true); + + sesExpirationThread.start(); + if (isRestEnabled()) { for (GridRestProtocol proto : protos) proto.onKernalStart(); @@ -749,15 +807,67 @@ public class GridRestProcessor extends GridProcessorAdapter { X.println(">>> handlersSize: " + handlers.size()); } + /** + * Session. + */ private static class Session { - UUID clientId; - UUID sesTokId; - SecurityContext secCtx; + private static final Long EXPIRED_FLAG = -0l; + + /** Client id. */ + volatile UUID clientId; + + /** Session token id. */ + volatile UUID sesTokId; + /** Security context. */ + volatile SecurityContext secCtx; + + final AtomicLong lastTouchTime = new AtomicLong(U.currentTimeMillis()); + + /** + * @return Session token as bytes. + */ byte[] sesTok(){ return U.uuidToBytes(sesTokId); } + /** + * // TODO javadoc + */ + boolean checkExpiration() { + long time0 = lastTouchTime.get(); + + if (U.currentTimeMillis() - time0 > SES_EXPERATION_TIME) + lastTouchTime.compareAndSet(time0, EXPIRED_FLAG); + + return lastTouchTime.get() == EXPIRED_FLAG; + } + + /** + * // TODO javadoc + * @return <code>True</code> if expired. + */ + boolean checkExpirationAndTryUpdateLastTouchTime() { + while(true) { + long time0 = lastTouchTime.get(); + + if (time0 == EXPIRED_FLAG) + return true; + + boolean success = lastTouchTime.compareAndSet(time0, U.currentTimeMillis()); + + if (success) + return false; + } + } + + /** + * @return <code>True</code> if session in expired state. + */ + boolean isExpired() { + return lastTouchTime.get() == EXPIRED_FLAG; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o)