Repository: incubator-ignite Updated Branches: refs/heads/ignite-gg-10561 dbe2798cb -> cb51025fe
# gg-10561: reimplement session indexing Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cb51025f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cb51025f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cb51025f Branch: refs/heads/ignite-gg-10561 Commit: cb51025fe590be5df85205c6160d399b2411f0a0 Parents: dbe2798 Author: ashutak <ashu...@gridgain.com> Authored: Fri Jul 24 16:46:09 2015 +0300 Committer: ashutak <ashu...@gridgain.com> Committed: Fri Jul 24 16:46:09 2015 +0300 ---------------------------------------------------------------------- .../processors/rest/GridRestProcessor.java | 154 +++++++++++-------- 1 file changed, 86 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb51025f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 99e3a8a..d4e84db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -83,10 +83,10 @@ public class GridRestProcessor extends GridProcessorAdapter { private final LongAdder8 workersCnt = new LongAdder8(); /** SecurityContext map. */ - private ConcurrentMap<UUID, Session> clientId2Ses = new ConcurrentHashMap<>(); + private final ConcurrentMap<UUID, UUID> clientId2SesId = new ConcurrentHashMap<>(); /** SecurityContext map. */ - private ConcurrentMap<UUID, Session> sesTokId2Ses = new ConcurrentHashMap<>(); + private final ConcurrentMap<UUID, Session> sesId2Ses = new ConcurrentHashMap<>(); /** Protocol handler. */ private final GridRestProtocolHandler protoHnd = new GridRestProtocolHandler() { @@ -100,7 +100,7 @@ public class GridRestProcessor extends GridProcessorAdapter { }; /** Sesion timeout size */ - private final long sesTimeout; + private final long sesTtl; /** * @param req Request. @@ -274,78 +274,82 @@ public class GridRestProcessor extends GridProcessorAdapter { final UUID clientId = req.clientId(); final byte[] sesTok = req.sessionToken(); - if (F.isEmpty(sesTok) && clientId == null) { - Session ses = new Session(); - ses.clientId = UUID.randomUUID(); - ses.sesTokId = UUID.randomUUID(); + for (int i = 0; i < 10_000; i++) { + if (F.isEmpty(sesTok) && clientId == null) { + Session ses = Session.random(); - clientId2Ses.put(ses.clientId, ses); - sesTokId2Ses.put(ses.sesTokId, ses); + if (clientId2SesId.putIfAbsent(clientId, ses.sesId)!=null) + continue; /** New random clientId equals to existing clientId */ - return ses; - } + sesId2Ses.put(ses.sesId, ses); - if (F.isEmpty(sesTok) && clientId != null) { - Session ses = clientId2Ses.get(clientId); + return ses; + } - if (ses == null) { /** First request with this clientId */ - ses = new Session(); - ses.clientId = clientId; - ses.sesTokId = UUID.randomUUID(); + if (F.isEmpty(sesTok) && clientId != null) { + UUID sesId = clientId2SesId.get(clientId); - Session curSes = clientId2Ses.putIfAbsent(ses.clientId, ses); + if (sesId == null) { + Session ses = Session.fromClientId(clientId); - if (curSes == null) - sesTokId2Ses.put(ses.sesTokId, ses); - else { - boolean expired = curSes.checkTimeoutAndTryUpdateLastTouchTime(); + if (clientId2SesId.putIfAbsent(ses.clientId, ses.sesId)!=null) + continue; /** Another thread already register session with the clientId. */ + + sesId2Ses.put(ses.sesId, ses); - // curSes != null means that there was at least 2 parallel request - // by the same clientId (and we was not first). - // So it was approximately at the same time and oldSession can't be expired. - assert !expired; + return ses; } + else { + Session ses = sesId2Ses.get(sesId); + + if (ses == null) + continue; /** Need to wait while timeout thread complete removing of timed out sessions. */ - return curSes == null ? ses : curSes; + if (ses.checkTimeoutAndTryUpdateLastTouchTime()) + continue; /** Need to wait while timeout thread complete removing of timed out sessions. */ + + return ses; + } } - else { + + if (!F.isEmpty(sesTok) && clientId == null) { + UUID sesId = U.bytesToUuid(sesTok, 0); + + Session ses = sesId2Ses.get(sesId); + + if (ses == null) + throw new IgniteCheckedException("Failed to handle request. Unknown session token " + + "(maybe expired session). [sessionToken=" + U.byteArray2HexString(sesTok) + "]"); + if (ses.checkTimeoutAndTryUpdateLastTouchTime()) - return session(req); + continue; /** Need to wait while timeout thread complete removing of timed out sessions. */ return ses; } - } - if (!F.isEmpty(sesTok) && clientId == null) { - UUID sesTokId = U.bytesToUuid(sesTok, 0); + if (!F.isEmpty(sesTok) && clientId != null) { + UUID sesId = clientId2SesId.get(clientId); - Session ses = sesTokId2Ses.get(sesTokId); + if (!sesId.equals(U.bytesToUuid(sesTok, 0))) + throw new IgniteCheckedException("Failed to handle request. " + + "Unsupported case (misamatched clientId and session token)"); - if (ses == null) - throw new IgniteCheckedException("Failed to handle request. Unknown session token " + - "(maybe expired session). [sessionToken=" + U.byteArray2HexString(sesTok) + "]"); + Session ses = sesId2Ses.get(sesId); - if (ses.checkTimeoutAndTryUpdateLastTouchTime()) - return session(req); + if (ses == null) + throw new IgniteCheckedException("Failed to handle request. Unknown session token " + + "(maybe expired session). [sessionToken=" + U.byteArray2HexString(sesTok) + "]"); - return ses; - } - - if (!F.isEmpty(sesTok) && clientId != null) { - Session ses1 = sesTokId2Ses.get(U.bytesToUuid(sesTok, 0)); - Session ses2 = clientId2Ses.get(clientId); - - if (ses1 == null || ses2 == null || !ses1.equals(ses2)) - throw new IgniteCheckedException("Failed to handle request. " + - "Unsupported case (use one: clientId or session token)"); + if (ses.checkTimeoutAndTryUpdateLastTouchTime()) + continue; /** Lets try to reslove session again. */ - if (ses1.checkTimeoutAndTryUpdateLastTouchTime()) - return session(req); + return ses; + } - return ses1; + throw new IgniteCheckedException("Failed to handle request (Unreachable state)."); } - throw new IgniteCheckedException("Failed to handle request (Unreachable state)."); + throw new IgniteCheckedException("Failed to handle request (Could not resolve session)."); } /** @@ -368,7 +372,7 @@ public class GridRestProcessor extends GridProcessorAdapter { sesExpTime0 = DEFAULT_SES_TIMEOUT; } - sesTimeout = sesExpTime0; + sesTtl = sesExpTime0; } /** {@inheritDoc} */ @@ -414,20 +418,17 @@ public class GridRestProcessor extends GridProcessorAdapter { while(!Thread.currentThread().isInterrupted()) { Thread.sleep(SES_TIMEOUT_CHECK_DELAY); - for (Iterator<Map.Entry<UUID, Session>> iter = clientId2Ses.entrySet().iterator(); + for (Iterator<Map.Entry<UUID, Session>> iter = sesId2Ses.entrySet().iterator(); iter.hasNext();) { Map.Entry<UUID, Session> e = iter.next(); - if (e.getValue().checkTimeout(sesTimeout)) - iter.remove(); - } + Session ses = e.getValue(); - for (Iterator<Map.Entry<UUID, Session>> iter = sesTokId2Ses.entrySet().iterator(); - iter.hasNext();) { - Map.Entry<UUID, Session> e = iter.next(); - - if (e.getValue().isTimedOut()) + if (ses.checkTimeout(sesTtl)) { iter.remove(); + + clientId2SesId.remove(ses.clientId, ses.sesId); + } } } } @@ -816,10 +817,10 @@ public class GridRestProcessor extends GridProcessorAdapter { private static final Long TIMEDOUT_FLAG = 0L; /** Client id. */ - private volatile UUID clientId; + private final UUID clientId; /** Session token id. */ - private volatile UUID sesTokId; + private final UUID sesId; /** Security context. */ private volatile SecurityContext secCtx; @@ -830,11 +831,28 @@ public class GridRestProcessor extends GridProcessorAdapter { */ private final AtomicLong lastTouchTime = new AtomicLong(U.currentTimeMillis()); + private Session(UUID clientId, UUID sesId) { + this.clientId = clientId; + this.sesId = sesId; + } + + static Session random() { + return new Session(UUID.randomUUID(), UUID.randomUUID()); + } + + static Session fromClientId(UUID clientId) { + return new Session(clientId, UUID.randomUUID()); + } + + static Session fromSessionToken(UUID sesTokId) { + return new Session(UUID.randomUUID(), sesTokId); + } + /** * @return Session token as bytes. */ byte[] sesTok(){ - return U.uuidToBytes(sesTokId); + return U.uuidToBytes(sesId); } /** @@ -855,7 +873,7 @@ public class GridRestProcessor extends GridProcessorAdapter { /** * Checks whether session at expired state (EPIRATION_FLAG) or not, if not then tries to update last touch time. * - * @return <code>True</code> if expired. + * @return <code>True</code> if timed out. */ boolean checkTimeoutAndTryUpdateLastTouchTime() { while (true) { @@ -889,7 +907,7 @@ public class GridRestProcessor extends GridProcessorAdapter { if (clientId != null ? !clientId.equals(ses.clientId) : ses.clientId != null) return false; - if (sesTokId != null ? !sesTokId.equals(ses.sesTokId) : ses.sesTokId != null) + if (sesId != null ? !sesId.equals(ses.sesId) : ses.sesId != null) return false; return true; @@ -898,7 +916,7 @@ public class GridRestProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public int hashCode() { int res = clientId != null ? clientId.hashCode() : 0; - res = 31 * res + (sesTokId != null ? sesTokId.hashCode() : 0); + res = 31 * res + (sesId != null ? sesId.hashCode() : 0); return res; } }