Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 9699e1f8c -> 64602bba6


# ignite-901 WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64602bba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64602bba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64602bba

Branch: refs/heads/ignite-901
Commit: 64602bba63723d99bb88783c2c7e3a29d033a1dc
Parents: 9699e1f
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Jul 6 10:20:18 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Jul 6 10:20:18 2015 +0300

----------------------------------------------------------------------
 .../CacheDataStructuresManager.java             | 26 ++++++++++++++++++++
 .../datastructures/DataStructuresProcessor.java |  3 +++
 .../datastructures/GridCacheQueueAdapter.java   | 11 +++++++++
 .../datastructures/GridCacheSetImpl.java        | 13 ++++++++++
 .../datastructures/GridCacheSetProxy.java       |  7 ++++++
 .../processors/task/GridTaskProcessor.java      | 15 +++++++++--
 .../IgniteClientReconnectCacheTest.java         |  6 ++---
 .../IgniteClientReconnectCollectionsTest.java   | 18 +++++++-------
 8 files changed, 84 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index b5c5161..3691ee6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -106,6 +106,32 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
     }
 
     /**
+     * Client reconnect callback.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onReconnected() throws IgniteCheckedException {
+        for (Map.Entry<IgniteUuid, GridCacheSetProxy> e : setsMap.entrySet()) {
+            GridCacheSetProxy set = e.getValue();
+
+            if (!set.delegate().checkHeader()) {
+                set.blockOnRemove();
+
+                setsMap.remove(e.getKey(), e.getValue());
+            }
+        }
+
+        for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : 
queuesMap.entrySet()) {
+            GridCacheQueueProxy queue = e.getValue();
+
+            if (!queue.delegate().checkHeader()) {
+                queue.delegate().onRemoved(false);
+
+                queuesMap.remove(e.getKey(), e.getValue());
+            }
+        }
+    }
+
+    /**
      * @throws IgniteCheckedException If thread is interrupted or manager
      *     was not successfully initialized.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 4637bd0..b2c3c04 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -198,6 +198,9 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
                 e.getValue().onRemoved();
             }
         }
+
+        for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
+            cctx.dataStructures().onReconnected();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 9fd7356..f62c4eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -106,6 +106,17 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
         writeSem = bounded() ? new Semaphore(hdr.capacity() - hdr.size(), 
true) : null;
     }
 
+    /**
+     * @return {@code True} if queue header found in cache.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean checkHeader() throws IgniteCheckedException {
+        GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
+
+        return !queueRemoved(hdr, id);
+    }
+
     /** {@inheritDoc} */
     @Override public String name() {
         return queueName;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f74fe95..936662a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -101,6 +101,19 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
         return rmvd;
     }
 
+    /**
+     * @return {@code True} if set header found in cache.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean checkHeader() throws IgniteCheckedException {
+        IgniteInternalCache<GridCacheSetHeaderKey, GridCacheSetHeader> cache0 
= ctx.cache();
+
+        GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name));
+
+        return hdr != null && hdr.id().equals(id);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public int size() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index ba43da7..38c124a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -78,6 +78,13 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, 
Externalizable {
     }
 
     /**
+     * @return Set delegate.
+     */
+    public GridCacheSetImpl delegate() {
+        return delegate;
+    }
+
+    /**
      * Remove callback.
      */
     public void blockOnRemove() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 39f6bd5..22add73 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -76,8 +76,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
     private final LongAdder8 execTasks = new LongAdder8();
 
     /** */
-    private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx =
-        new ThreadLocal<>();
+    private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = 
new ThreadLocal<>();
 
     /** */
     private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
@@ -119,6 +118,18 @@ public class GridTaskProcessor extends 
GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws 
IgniteCheckedException {
+        IgniteClientDisconnectedCheckedException err = new 
IgniteClientDisconnectedCheckedException(reconnectFut,
+            "Client node disconnected.");
+
+        for (GridTaskWorker<?, ?> worker : tasks.values()) {
+            worker.cancel();
+
+            worker.finishTask(null, err);
+        }
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("TooBroadScope")
     @Override public void onKernalStop(boolean cancel) {
         lock.writeLock();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 258eef9..f9e2a9a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -252,8 +252,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
         final CountDownLatch reconnectLatch = new CountDownLatch(1);
 
         client.events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event evt) {
+            @Override public boolean apply(Event evt) {
                 if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
                     info("Reconnected: " + evt);
 
@@ -521,8 +520,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
         });
 
         GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override
-            public Object call() throws Exception {
+            @Override public Object call() throws Exception {
                 return clientCache.get(1);
             }
         }, IllegalStateException.class, null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index 54e1329..98be9f2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -83,20 +83,20 @@ public class IgniteClientReconnectCollectionsTest extends 
IgniteClientReconnectA
     /**
      * @throws Exception If failed.
      */
-    public void testQueueReconnectInProg() throws Exception {
+    public void testQueueReconnectInProgress() throws Exception {
         CollectionConfiguration colCfg = new CollectionConfiguration();
 
         colCfg.setCacheMode(PARTITIONED);
         colCfg.setAtomicityMode(TRANSACTIONAL);
 
-        queueReconnectInProg(colCfg);
+        queueReconnectInProgress(colCfg);
 
         colCfg = new CollectionConfiguration();
 
         colCfg.setCacheMode(PARTITIONED);
         colCfg.setAtomicityMode(ATOMIC);
 
-        queueReconnectInProg(colCfg);
+        queueReconnectInProgress(colCfg);
     }
 
     /**
@@ -121,7 +121,7 @@ public class IgniteClientReconnectCollectionsTest extends 
IgniteClientReconnectA
     /**
      * @throws Exception If failed.
      */
-    public void testSetReconnectRemove() throws Exception {
+    public void testSetReconnectRemoved() throws Exception {
         CollectionConfiguration colCfg = new CollectionConfiguration();
 
         colCfg.setCacheMode(PARTITIONED);
@@ -140,20 +140,20 @@ public class IgniteClientReconnectCollectionsTest extends 
IgniteClientReconnectA
     /**
      * @throws Exception If failed.
      */
-    public void testSetReconnectInProg() throws Exception {
+    public void testSetReconnectInProgress() throws Exception {
         CollectionConfiguration colCfg = new CollectionConfiguration();
 
         colCfg.setCacheMode(PARTITIONED);
         colCfg.setAtomicityMode(ATOMIC);
 
-        setReconnectInProg(colCfg);
+        setReconnectInProgress(colCfg);
 
         colCfg = new CollectionConfiguration();
 
         colCfg.setCacheMode(PARTITIONED);
         colCfg.setAtomicityMode(TRANSACTIONAL);
 
-        setReconnectInProg(colCfg);
+        setReconnectInProgress(colCfg);
     }
 
     /**
@@ -230,7 +230,7 @@ public class IgniteClientReconnectCollectionsTest extends 
IgniteClientReconnectA
      * @param colCfg Collection configuration.
      * @throws Exception If failed.
      */
-    private void setReconnectInProg(final CollectionConfiguration colCfg) 
throws Exception {
+    private void setReconnectInProgress(final CollectionConfiguration colCfg) 
throws Exception {
         Ignite client = grid(serverCount());
 
         assertTrue(client.cluster().localNode().isClient());
@@ -358,7 +358,7 @@ public class IgniteClientReconnectCollectionsTest extends 
IgniteClientReconnectA
      * @param colCfg Collection configuration.
      * @throws Exception If failed.
      */
-    private void queueReconnectInProg(final CollectionConfiguration colCfg) 
throws Exception {
+    private void queueReconnectInProgress(final CollectionConfiguration 
colCfg) throws Exception {
         Ignite client = grid(serverCount());
 
         assertTrue(client.cluster().localNode().isClient());

Reply via email to