Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 bfc93128a -> c4abdf4ca


# ignite-901 WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4abdf4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4abdf4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4abdf4c

Branch: refs/heads/ignite-901
Commit: c4abdf4cac46ca9c055c02cacd6edbf68343493d
Parents: bfc9312
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Jul 3 14:30:48 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Jul 3 14:30:48 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  4 --
 .../GridCachePartitionExchangeManager.java      | 72 ++++++++++----------
 .../datastructures/DataStructuresProcessor.java | 12 ++++
 .../IgniteClientReconnectAtomicsTest.java       |  8 +--
 4 files changed, 51 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4abdf4c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 27f60a8..f97a1c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2847,10 +2847,6 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         }
     }
 
-    public void rejoined() {
-
-    }
-
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4abdf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e883b9c..bb69420 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -274,54 +274,52 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         // Allow discovery events to get processed.
         locExchFut.onDone();
 
-        if (cctx.kernalContext().gateway().getState() == 
GridKernalState.STARTED) {
-            if (log.isDebugEnabled())
-                log.debug("Beginning to wait on local exchange future: " + 
fut);
+        if (log.isDebugEnabled())
+            log.debug("Beginning to wait on local exchange future: " + fut);
 
-            try {
-                boolean first = true;
+        try {
+            boolean first = true;
 
-                while (true) {
-                    try {
-                        fut.get(cctx.preloadExchangeTimeout());
+            while (true) {
+                try {
+                    fut.get(cctx.preloadExchangeTimeout());
 
-                        break;
-                    }
-                    catch (IgniteClientDisconnectedCheckedException e) {
-                        log.info("Disconnected while waiting for initial 
partition map exchange: " + e);
+                    break;
+                }
+                catch (IgniteClientDisconnectedCheckedException e) {
+                    log.info("Disconnected while waiting for initial partition 
map exchange: " + e);
 
-                        break;
-                    }
-                    catch (IgniteFutureTimeoutCheckedException ignored) {
-                        if (first) {
-                            U.warn(log, "Failed to wait for initial partition 
map exchange. " +
-                                "Possible reasons are: " + U.nl() +
-                                "  ^-- Transactions in deadlock." + U.nl() +
-                                "  ^-- Long running transactions (ignore if 
this is the case)." + U.nl() +
-                                "  ^-- Unreleased explicit locks.");
-
-                            first = false;
-                        }
-                        else
-                            U.warn(log, "Still waiting for initial partition 
map exchange [fut=" + fut + ']');
+                    break;
+                }
+                catch (IgniteFutureTimeoutCheckedException ignored) {
+                    if (first) {
+                        U.warn(log, "Failed to wait for initial partition map 
exchange. " +
+                            "Possible reasons are: " + U.nl() +
+                            "  ^-- Transactions in deadlock." + U.nl() +
+                            "  ^-- Long running transactions (ignore if this 
is the case)." + U.nl() +
+                            "  ^-- Unreleased explicit locks.");
+
+                        first = false;
                     }
+                    else
+                        U.warn(log, "Still waiting for initial partition map 
exchange [fut=" + fut + ']');
                 }
-
-                for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                    cacheCtx.preloader().onInitialExchangeComplete(null);
             }
-            catch (IgniteFutureTimeoutCheckedException e) {
-                IgniteCheckedException err = new IgniteCheckedException("Timed 
out waiting for exchange future: " + fut, e);
 
-                for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                    cacheCtx.preloader().onInitialExchangeComplete(err);
+            for (GridCacheContext cacheCtx : cctx.cacheContexts())
+                cacheCtx.preloader().onInitialExchangeComplete(null);
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            IgniteCheckedException err = new IgniteCheckedException("Timed out 
waiting for exchange future: " + fut, e);
 
-                throw err;
-            }
+            for (GridCacheContext cacheCtx : cctx.cacheContexts())
+                cacheCtx.preloader().onInitialExchangeComplete(err);
 
-            if (log.isDebugEnabled())
-                log.debug("Finished waiting on local exchange: " + 
fut.exchangeId());
+            throw err;
         }
+
+        if (log.isDebugEnabled())
+            log.debug("Finished waiting on local exchange: " + 
fut.exchangeId());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4abdf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index d4f67fa..95c9563 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -185,6 +185,18 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
             dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onReconnected() throws IgniteCheckedException {
+        Set<GridCacheInternal> keys = dsMap.keySet();
+
+        Map<GridCacheInternal, GridCacheInternal> vals = dsView.getAll(keys);
+
+        for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : 
dsMap.entrySet()) {
+            if (!vals.containsKey(e.getKey()))
+                e.getValue().onRemoved();
+        }
+    }
+
     /**
      * Gets a sequence from cache or creates one if it's not cached.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4abdf4c/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index d7f6170..c4078f8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -181,7 +181,7 @@ public class IgniteClientReconnectAtomicsTest extends 
IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IgniteException.class, null);
+        }, IllegalStateException.class, null);
     }
 
     /**
@@ -419,7 +419,7 @@ public class IgniteClientReconnectAtomicsTest extends 
IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IgniteException.class, null);
+        }, IllegalStateException.class, null);
     }
 
     /**
@@ -663,7 +663,7 @@ public class IgniteClientReconnectAtomicsTest extends 
IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IgniteException.class, null);
+        }, IllegalStateException.class, null);
     }
 
     /**
@@ -892,7 +892,7 @@ public class IgniteClientReconnectAtomicsTest extends 
IgniteClientReconnectAbstr
 
                 return null;
             }
-        }, IgniteException.class, null);
+        }, IllegalStateException.class, null);
     }
 
     /**

Reply via email to