Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 86d963f98 -> 15ef9b9e3


# ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15ef9b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15ef9b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15ef9b9e

Branch: refs/heads/ignite-901
Commit: 15ef9b9e397522ddf03f5afe3aa5d5d555474bed
Parents: 86d963f
Author: sboikov <semen.boi...@inria.fr>
Authored: Thu Jul 9 08:15:43 2015 +0300
Committer: sboikov <semen.boi...@inria.fr>
Committed: Thu Jul 9 08:15:43 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   4 +-
 .../discovery/GridDiscoveryManager.java         | 143 ++++++++++---------
 .../query/h2/twostep/GridMergeIndex.java        |  16 +++
 .../h2/twostep/GridReduceQueryExecutor.java     |  13 +-
 4 files changed, 103 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 0a9d093..ba8dc15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2855,7 +2855,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
     /**
      * @param clusterRestarted {@code True} if all cluster nodes restarted 
while client was disconnected.
      */
-    public void reconnected(final boolean clusterRestarted) {
+    public void onReconnected(final boolean clusterRestarted) {
         Throwable err = null;
 
         try {
@@ -2863,8 +2863,6 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
 
             for (GridComponent comp : ctx.components())
                 comp.onReconnected(clusterRestarted);
-
-            ctx.gateway().onReconnected();
         }
         catch (IgniteCheckedException e) {
             err = e;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 096f0e8..c0d9f13 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -320,7 +320,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
         ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory);
 
-        DiscoverySpi spi = getSpi();
+        final DiscoverySpi spi = getSpi();
 
         discoOrdered = discoOrdered();
 
@@ -371,6 +371,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         }
 
         spi.setListener(new DiscoverySpiListener() {
+            private long gridStartTime;
+
             @Override public void onDiscovery(
                 int type,
                 long topVer,
@@ -460,6 +462,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                 // If this is a local join event, just save it and do not 
notify listeners.
                 if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) 
{
+                    gridStartTime = spi.getGridStartTime();
+
                     DiscoveryEvent discoEvt = new DiscoveryEvent();
 
                     discoEvt.node(ctx.discovery().localNode());
@@ -488,13 +492,28 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                     ((IgniteKernal)ctx.grid()).onDisconnected();
 
-                    DiscoveryEvent evt = new DiscoveryEvent();
+                    recordEvent(type, topVer, node, topSnapshot);
+
+                    return;
+                }
+                else if (type == EVT_CLIENT_NODE_RECONNECTED) {
+                    assert locNode.isClient() : locNode;
+                    assert node.isClient() : node;
+
+                    boolean clusterRestarted = gridStartTime != 
spi.getGridStartTime();
 
-                    evt.node(ctx.discovery().localNode());
-                    evt.eventNode(node);
-                    evt.type(type);
+                    gridStartTime = spi.getGridStartTime();
 
-                    ctx.event().record(evt);
+                    ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
+
+                    recordEvent(type, topVer, node, topSnapshot);
+
+                    ctx.gateway().onReconnected();
+
+                    if (log.isInfoEnabled())
+                        log.info("Client node reconnected to cluster: " + 
node);
+
+                    ackTopology(topVer, true);
 
                     return;
                 }
@@ -1646,6 +1665,55 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         ).start();
     }
 
+    /**
+     * Method is called when any discovery event occurs.
+     *
+     * @param type Discovery event type. See {@link DiscoveryEvent} for more 
details.
+     * @param topVer Topology version.
+     * @param node Remote node this event is connected with.
+     * @param topSnapshot Topology snapshot.
+     */
+    @SuppressWarnings("RedundantTypeArguments")
+    private void recordEvent(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> topSnapshot) {
+        assert node != null;
+
+        if (ctx.event().isRecordable(type)) {
+            DiscoveryEvent evt = new DiscoveryEvent();
+
+            evt.node(ctx.discovery().localNode());
+            evt.eventNode(node);
+            evt.type(type);
+
+            evt.topologySnapshot(topVer, U.<ClusterNode, 
ClusterNode>arrayList(topSnapshot, daemonFilter));
+
+            if (type == EVT_NODE_METRICS_UPDATED)
+                evt.message("Metrics were updated: " + node);
+
+            else if (type == EVT_NODE_JOINED)
+                evt.message("Node joined: " + node);
+
+            else if (type == EVT_NODE_LEFT)
+                evt.message("Node left: " + node);
+
+            else if (type == EVT_NODE_FAILED)
+                evt.message("Node failed: " + node);
+
+            else if (type == EVT_NODE_SEGMENTED)
+                evt.message("Node segmented: " + node);
+
+            else if (type == EVT_CLIENT_NODE_DISCONNECTED)
+                evt.message("Client node disconnected: " + node);
+
+            else if (type == EVT_CLIENT_NODE_RECONNECTED)
+                evt.message("Client node reconnected: " + node);
+
+            else
+                assert false;
+
+            ctx.event().record(evt);
+        }
+    }
+
     /** Worker for network segment checks. */
     private class SegmentCheckWorker extends GridWorker {
         /** */
@@ -1736,55 +1804,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Method is called when any discovery event occurs.
-         *
-         * @param type Discovery event type. See {@link DiscoveryEvent} for 
more details.
-         * @param topVer Topology version.
-         * @param node Remote node this event is connected with.
-         * @param topSnapshot Topology snapshot.
-         */
-        @SuppressWarnings("RedundantTypeArguments")
-        private void recordEvent(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> topSnapshot) {
-            assert node != null;
-
-            if (ctx.event().isRecordable(type)) {
-                DiscoveryEvent evt = new DiscoveryEvent();
-
-                evt.node(ctx.discovery().localNode());
-                evt.eventNode(node);
-                evt.type(type);
-
-                evt.topologySnapshot(topVer, U.<ClusterNode, 
ClusterNode>arrayList(topSnapshot, daemonFilter));
-
-                if (type == EVT_NODE_METRICS_UPDATED)
-                    evt.message("Metrics were updated: " + node);
-
-                else if (type == EVT_NODE_JOINED)
-                    evt.message("Node joined: " + node);
-
-                else if (type == EVT_NODE_LEFT)
-                    evt.message("Node left: " + node);
-
-                else if (type == EVT_NODE_FAILED)
-                    evt.message("Node failed: " + node);
-
-                else if (type == EVT_NODE_SEGMENTED)
-                    evt.message("Node segmented: " + node);
-
-                else if (type == EVT_CLIENT_NODE_DISCONNECTED)
-                    evt.message("Client node disconnected: " + node);
-
-                else if (type == EVT_CLIENT_NODE_RECONNECTED)
-                    evt.message("Client node reconnected: " + node);
-
-                else
-                    assert false;
-
-                ctx.event().record(evt);
-            }
-        }
-
-        /**
          * @param type Event type.
          * @param topVer Topology version.
          * @param node Node.
@@ -1946,20 +1965,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     break;
                 }
 
-                case EVT_CLIENT_NODE_RECONNECTED: {
-                    assert localNode().isClient() : evt;
-
-                    // TODO IGNITE-901.
-                    ((IgniteKernal)ctx.grid()).reconnected(false);
-
-                    if (log.isInfoEnabled())
-                        log.info("Client node reconnected to cluster: " + 
node);
-
-                    ackTopology(topVer.topologyVersion(), true);
-
-                    break;
-                }
-
                 case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
                     if 
(ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) {
                         DiscoveryCustomEvent customEvt = new 
DiscoveryCustomEvent();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 244ae46..05677a4 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -26,6 +26,7 @@ import org.h2.table.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import javax.cache.CacheException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -92,6 +93,21 @@ public abstract class GridMergeIndex extends BaseIndex {
             throw new IllegalStateException();
     }
 
+    public void fail(final CacheException e) {
+        for (UUID nodeId0 : remainingRows.keySet()) {
+            addPage0(new GridResultPage(null, nodeId0, null) {
+                @Override public boolean isFail() {
+                    return true;
+                }
+
+                @Override
+                public void fetchNextPage() {
+                    throw e;
+                }
+            });
+        }
+    }
+
     /**
      * @param nodeId Node ID.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b531c35..8f03681 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1122,7 +1122,7 @@ public class GridReduceQueryExecutor {
             new IgniteClientDisconnectedException(reconnectFut, "Client node 
disconnected."));
 
         for (Map.Entry<Long, QueryRun> e : runs.entrySet())
-            e.getValue().state(err, null);
+            e.getValue().disconnected(err);
     }
 
     /**
@@ -1161,6 +1161,17 @@ public class GridReduceQueryExecutor {
             for (GridMergeTable tbl : tbls) // Fail all merge indexes.
                 tbl.getScanIndex(null).fail(nodeId);
         }
+
+        void disconnected(CacheException e) {
+            if (!state.compareAndSet(null, e))
+                return;
+
+            while (latch.getCount() != 0) // We don't need to wait for all 
nodes to reply.
+                latch.countDown();
+
+            for (GridMergeTable tbl : tbls) // Fail all merge indexes.
+                tbl.getScanIndex(null).fail(e);
+        }
     }
 
     /**

Reply via email to