Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-gg-10561 dbe2798cb -> cb51025fe


# gg-10561: reimplement session indexing


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cb51025f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cb51025f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cb51025f

Branch: refs/heads/ignite-gg-10561
Commit: cb51025fe590be5df85205c6160d399b2411f0a0
Parents: dbe2798
Author: ashutak <ashu...@gridgain.com>
Authored: Fri Jul 24 16:46:09 2015 +0300
Committer: ashutak <ashu...@gridgain.com>
Committed: Fri Jul 24 16:46:09 2015 +0300

----------------------------------------------------------------------
 .../processors/rest/GridRestProcessor.java      | 154 +++++++++++--------
 1 file changed, 86 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cb51025f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
index 99e3a8a..d4e84db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java
@@ -83,10 +83,10 @@ public class GridRestProcessor extends GridProcessorAdapter 
{
     private final LongAdder8 workersCnt = new LongAdder8();
 
     /** SecurityContext map. */
-    private ConcurrentMap<UUID, Session> clientId2Ses = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, UUID> clientId2SesId = new 
ConcurrentHashMap<>();
 
     /** SecurityContext map. */
-    private ConcurrentMap<UUID, Session> sesTokId2Ses = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<UUID, Session> sesId2Ses = new 
ConcurrentHashMap<>();
 
     /** Protocol handler. */
     private final GridRestProtocolHandler protoHnd = new 
GridRestProtocolHandler() {
@@ -100,7 +100,7 @@ public class GridRestProcessor extends GridProcessorAdapter 
{
     };
 
     /** Sesion timeout size */
-    private final long sesTimeout;
+    private final long sesTtl;
 
     /**
      * @param req Request.
@@ -274,78 +274,82 @@ public class GridRestProcessor extends 
GridProcessorAdapter {
         final UUID clientId = req.clientId();
         final byte[] sesTok = req.sessionToken();
 
-        if (F.isEmpty(sesTok) && clientId == null) {
-            Session ses = new Session();
-            ses.clientId = UUID.randomUUID();
-            ses.sesTokId = UUID.randomUUID();
+        for (int i = 0; i < 10_000; i++) {
+            if (F.isEmpty(sesTok) && clientId == null) {
+                Session ses = Session.random();
 
-            clientId2Ses.put(ses.clientId, ses);
-            sesTokId2Ses.put(ses.sesTokId, ses);
+                if (clientId2SesId.putIfAbsent(clientId, ses.sesId)!=null)
+                    continue; /** New random clientId equals to existing 
clientId */
 
-            return ses;
-        }
+                sesId2Ses.put(ses.sesId, ses);
 
-        if (F.isEmpty(sesTok) && clientId != null) {
-            Session ses = clientId2Ses.get(clientId);
+                return ses;
+            }
 
-            if (ses == null) { /** First request with this clientId */
-                ses = new Session();
-                ses.clientId = clientId;
-                ses.sesTokId = UUID.randomUUID();
+            if (F.isEmpty(sesTok) && clientId != null) {
+                UUID sesId = clientId2SesId.get(clientId);
 
-                Session curSes = clientId2Ses.putIfAbsent(ses.clientId, ses);
+                if (sesId == null) {
+                    Session ses = Session.fromClientId(clientId);
 
-                if (curSes == null)
-                    sesTokId2Ses.put(ses.sesTokId, ses);
-                else {
-                    boolean expired = 
curSes.checkTimeoutAndTryUpdateLastTouchTime();
+                    if (clientId2SesId.putIfAbsent(ses.clientId, 
ses.sesId)!=null)
+                        continue; /** Another thread already register session 
with the clientId. */
+
+                    sesId2Ses.put(ses.sesId, ses);
 
-                    // curSes != null means that there was at least 2 parallel 
request
-                    // by the same clientId (and we was not first).
-                    // So it was approximately at the same time and oldSession 
can't be expired.
-                    assert !expired;
+                    return ses;
                 }
+                else {
+                    Session ses = sesId2Ses.get(sesId);
+
+                    if (ses == null)
+                        continue; /** Need to wait while timeout thread 
complete removing of timed out sessions. */
 
-                return curSes == null ? ses : curSes;
+                    if (ses.checkTimeoutAndTryUpdateLastTouchTime())
+                        continue; /** Need to wait while timeout thread 
complete removing of timed out sessions. */
+
+                    return ses;
+                }
             }
-            else {
+
+            if (!F.isEmpty(sesTok) && clientId == null) {
+                UUID sesId = U.bytesToUuid(sesTok, 0);
+
+                Session ses = sesId2Ses.get(sesId);
+
+                if (ses == null)
+                    throw new IgniteCheckedException("Failed to handle 
request. Unknown session token " +
+                        "(maybe expired session). [sessionToken=" + 
U.byteArray2HexString(sesTok) + "]");
+
                 if (ses.checkTimeoutAndTryUpdateLastTouchTime())
-                    return session(req);
+                    continue; /** Need to wait while timeout thread complete 
removing of timed out sessions. */
 
                 return ses;
             }
-        }
 
-        if (!F.isEmpty(sesTok) && clientId == null) {
-            UUID sesTokId = U.bytesToUuid(sesTok, 0);
+            if (!F.isEmpty(sesTok) && clientId != null) {
+                UUID sesId = clientId2SesId.get(clientId);
 
-            Session ses = sesTokId2Ses.get(sesTokId);
+                if (!sesId.equals(U.bytesToUuid(sesTok, 0)))
+                    throw new IgniteCheckedException("Failed to handle 
request. " +
+                        "Unsupported case (misamatched clientId and session 
token)");
 
-            if (ses == null)
-                throw new IgniteCheckedException("Failed to handle request. 
Unknown session token " +
-                    "(maybe expired session). [sessionToken=" + 
U.byteArray2HexString(sesTok) + "]");
+                Session ses = sesId2Ses.get(sesId);
 
-            if (ses.checkTimeoutAndTryUpdateLastTouchTime())
-                return session(req);
+                if (ses == null)
+                    throw new IgniteCheckedException("Failed to handle 
request. Unknown session token " +
+                        "(maybe expired session). [sessionToken=" + 
U.byteArray2HexString(sesTok) + "]");
 
-            return ses;
-        }
-
-        if (!F.isEmpty(sesTok) && clientId != null) {
-            Session ses1 = sesTokId2Ses.get(U.bytesToUuid(sesTok, 0));
-            Session ses2 = clientId2Ses.get(clientId);
-
-            if (ses1 == null || ses2 == null || !ses1.equals(ses2))
-                throw new IgniteCheckedException("Failed to handle request. " +
-                    "Unsupported case (use one: clientId or session token)");
+                if (ses.checkTimeoutAndTryUpdateLastTouchTime())
+                    continue; /** Lets try to reslove session again. */
 
-            if (ses1.checkTimeoutAndTryUpdateLastTouchTime())
-                return session(req);
+                return ses;
+            }
 
-            return ses1;
+            throw new IgniteCheckedException("Failed to handle request 
(Unreachable state).");
         }
 
-        throw new IgniteCheckedException("Failed to handle request 
(Unreachable state).");
+        throw new IgniteCheckedException("Failed to handle request (Could not 
resolve session).");
     }
 
     /**
@@ -368,7 +372,7 @@ public class GridRestProcessor extends GridProcessorAdapter 
{
             sesExpTime0 = DEFAULT_SES_TIMEOUT;
         }
 
-        sesTimeout = sesExpTime0;
+        sesTtl = sesExpTime0;
     }
 
     /** {@inheritDoc} */
@@ -414,20 +418,17 @@ public class GridRestProcessor extends 
GridProcessorAdapter {
                     while(!Thread.currentThread().isInterrupted()) {
                         Thread.sleep(SES_TIMEOUT_CHECK_DELAY);
 
-                        for (Iterator<Map.Entry<UUID, Session>> iter = 
clientId2Ses.entrySet().iterator();
+                        for (Iterator<Map.Entry<UUID, Session>> iter = 
sesId2Ses.entrySet().iterator();
                             iter.hasNext();) {
                             Map.Entry<UUID, Session> e = iter.next();
 
-                            if (e.getValue().checkTimeout(sesTimeout))
-                                iter.remove();
-                        }
+                            Session ses = e.getValue();
 
-                        for (Iterator<Map.Entry<UUID, Session>> iter = 
sesTokId2Ses.entrySet().iterator();
-                            iter.hasNext();) {
-                            Map.Entry<UUID, Session> e = iter.next();
-
-                            if (e.getValue().isTimedOut())
+                            if (ses.checkTimeout(sesTtl)) {
                                 iter.remove();
+
+                                clientId2SesId.remove(ses.clientId, ses.sesId);
+                            }
                         }
                     }
                 }
@@ -816,10 +817,10 @@ public class GridRestProcessor extends 
GridProcessorAdapter {
         private static final Long TIMEDOUT_FLAG = 0L;
 
         /** Client id. */
-        private volatile UUID clientId;
+        private final UUID clientId;
 
         /** Session token id. */
-        private volatile UUID sesTokId;
+        private final UUID sesId;
 
         /** Security context. */
         private volatile SecurityContext secCtx;
@@ -830,11 +831,28 @@ public class GridRestProcessor extends 
GridProcessorAdapter {
          */
         private final AtomicLong lastTouchTime = new 
AtomicLong(U.currentTimeMillis());
 
+        private Session(UUID clientId, UUID sesId) {
+            this.clientId = clientId;
+            this.sesId = sesId;
+        }
+
+        static Session random() {
+            return new Session(UUID.randomUUID(), UUID.randomUUID());
+        }
+
+        static Session fromClientId(UUID clientId) {
+            return new Session(clientId, UUID.randomUUID());
+        }
+
+        static Session fromSessionToken(UUID sesTokId) {
+            return new Session(UUID.randomUUID(), sesTokId);
+        }
+
         /**
          * @return Session token as bytes.
          */
         byte[] sesTok(){
-            return U.uuidToBytes(sesTokId);
+            return U.uuidToBytes(sesId);
         }
 
         /**
@@ -855,7 +873,7 @@ public class GridRestProcessor extends GridProcessorAdapter 
{
         /**
          * Checks whether session at expired state (EPIRATION_FLAG) or not, if 
not then tries to update last touch time.
          *
-         * @return <code>True</code> if expired.
+         * @return <code>True</code> if timed out.
          */
         boolean checkTimeoutAndTryUpdateLastTouchTime() {
             while (true) {
@@ -889,7 +907,7 @@ public class GridRestProcessor extends GridProcessorAdapter 
{
 
             if (clientId != null ? !clientId.equals(ses.clientId) : 
ses.clientId != null)
                 return false;
-            if (sesTokId != null ? !sesTokId.equals(ses.sesTokId) : 
ses.sesTokId != null)
+            if (sesId != null ? !sesId.equals(ses.sesId) : ses.sesId != null)
                 return false;
 
             return true;
@@ -898,7 +916,7 @@ public class GridRestProcessor extends GridProcessorAdapter 
{
         /** {@inheritDoc} */
         @Override public int hashCode() {
             int res = clientId != null ? clientId.hashCode() : 0;
-            res = 31 * res + (sesTokId != null ? sesTokId.hashCode() : 0);
+            res = 31 * res + (sesId != null ? sesId.hashCode() : 0);
             return res;
         }
     }

Reply via email to