Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 86d963f98 -> 15ef9b9e3
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15ef9b9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15ef9b9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15ef9b9e Branch: refs/heads/ignite-901 Commit: 15ef9b9e397522ddf03f5afe3aa5d5d555474bed Parents: 86d963f Author: sboikov <semen.boi...@inria.fr> Authored: Thu Jul 9 08:15:43 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Thu Jul 9 08:15:43 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../discovery/GridDiscoveryManager.java | 143 ++++++++++--------- .../query/h2/twostep/GridMergeIndex.java | 16 +++ .../h2/twostep/GridReduceQueryExecutor.java | 13 +- 4 files changed, 103 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 0a9d093..ba8dc15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2855,7 +2855,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. */ - public void reconnected(final boolean clusterRestarted) { + public void onReconnected(final boolean clusterRestarted) { Throwable err = null; try { @@ -2863,8 +2863,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { for (GridComponent comp : ctx.components()) comp.onReconnected(clusterRestarted); - - ctx.gateway().onReconnected(); } catch (IgniteCheckedException e) { err = e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 096f0e8..c0d9f13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -320,7 +320,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory); - DiscoverySpi spi = getSpi(); + final DiscoverySpi spi = getSpi(); discoOrdered = discoOrdered(); @@ -371,6 +371,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } spi.setListener(new DiscoverySpiListener() { + private long gridStartTime; + @Override public void onDiscovery( int type, long topVer, @@ -460,6 +462,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // If this is a local join event, just save it and do not notify listeners. if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) { + gridStartTime = spi.getGridStartTime(); + DiscoveryEvent discoEvt = new DiscoveryEvent(); discoEvt.node(ctx.discovery().localNode()); @@ -488,13 +492,28 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onDisconnected(); - DiscoveryEvent evt = new DiscoveryEvent(); + recordEvent(type, topVer, node, topSnapshot); + + return; + } + else if (type == EVT_CLIENT_NODE_RECONNECTED) { + assert locNode.isClient() : locNode; + assert node.isClient() : node; + + boolean clusterRestarted = gridStartTime != spi.getGridStartTime(); - evt.node(ctx.discovery().localNode()); - evt.eventNode(node); - evt.type(type); + gridStartTime = spi.getGridStartTime(); - ctx.event().record(evt); + ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); + + recordEvent(type, topVer, node, topSnapshot); + + ctx.gateway().onReconnected(); + + if (log.isInfoEnabled()) + log.info("Client node reconnected to cluster: " + node); + + ackTopology(topVer, true); return; } @@ -1646,6 +1665,55 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ).start(); } + /** + * Method is called when any discovery event occurs. + * + * @param type Discovery event type. See {@link DiscoveryEvent} for more details. + * @param topVer Topology version. + * @param node Remote node this event is connected with. + * @param topSnapshot Topology snapshot. + */ + @SuppressWarnings("RedundantTypeArguments") + private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { + assert node != null; + + if (ctx.event().isRecordable(type)) { + DiscoveryEvent evt = new DiscoveryEvent(); + + evt.node(ctx.discovery().localNode()); + evt.eventNode(node); + evt.type(type); + + evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter)); + + if (type == EVT_NODE_METRICS_UPDATED) + evt.message("Metrics were updated: " + node); + + else if (type == EVT_NODE_JOINED) + evt.message("Node joined: " + node); + + else if (type == EVT_NODE_LEFT) + evt.message("Node left: " + node); + + else if (type == EVT_NODE_FAILED) + evt.message("Node failed: " + node); + + else if (type == EVT_NODE_SEGMENTED) + evt.message("Node segmented: " + node); + + else if (type == EVT_CLIENT_NODE_DISCONNECTED) + evt.message("Client node disconnected: " + node); + + else if (type == EVT_CLIENT_NODE_RECONNECTED) + evt.message("Client node reconnected: " + node); + + else + assert false; + + ctx.event().record(evt); + } + } + /** Worker for network segment checks. */ private class SegmentCheckWorker extends GridWorker { /** */ @@ -1736,55 +1804,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Method is called when any discovery event occurs. - * - * @param type Discovery event type. See {@link DiscoveryEvent} for more details. - * @param topVer Topology version. - * @param node Remote node this event is connected with. - * @param topSnapshot Topology snapshot. - */ - @SuppressWarnings("RedundantTypeArguments") - private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { - assert node != null; - - if (ctx.event().isRecordable(type)) { - DiscoveryEvent evt = new DiscoveryEvent(); - - evt.node(ctx.discovery().localNode()); - evt.eventNode(node); - evt.type(type); - - evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter)); - - if (type == EVT_NODE_METRICS_UPDATED) - evt.message("Metrics were updated: " + node); - - else if (type == EVT_NODE_JOINED) - evt.message("Node joined: " + node); - - else if (type == EVT_NODE_LEFT) - evt.message("Node left: " + node); - - else if (type == EVT_NODE_FAILED) - evt.message("Node failed: " + node); - - else if (type == EVT_NODE_SEGMENTED) - evt.message("Node segmented: " + node); - - else if (type == EVT_CLIENT_NODE_DISCONNECTED) - evt.message("Client node disconnected: " + node); - - else if (type == EVT_CLIENT_NODE_RECONNECTED) - evt.message("Client node reconnected: " + node); - - else - assert false; - - ctx.event().record(evt); - } - } - - /** * @param type Event type. * @param topVer Topology version. * @param node Node. @@ -1946,20 +1965,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } - case EVT_CLIENT_NODE_RECONNECTED: { - assert localNode().isClient() : evt; - - // TODO IGNITE-901. - ((IgniteKernal)ctx.grid()).reconnected(false); - - if (log.isInfoEnabled()) - log.info("Client node reconnected to cluster: " + node); - - ackTopology(topVer.topologyVersion(), true); - - break; - } - case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) { DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 244ae46..05677a4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -26,6 +26,7 @@ import org.h2.table.*; import org.jetbrains.annotations.*; import org.jsr166.*; +import javax.cache.CacheException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -92,6 +93,21 @@ public abstract class GridMergeIndex extends BaseIndex { throw new IllegalStateException(); } + public void fail(final CacheException e) { + for (UUID nodeId0 : remainingRows.keySet()) { + addPage0(new GridResultPage(null, nodeId0, null) { + @Override public boolean isFail() { + return true; + } + + @Override + public void fetchNextPage() { + throw e; + } + }); + } + } + /** * @param nodeId Node ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15ef9b9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index b531c35..8f03681 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -1122,7 +1122,7 @@ public class GridReduceQueryExecutor { new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.")); for (Map.Entry<Long, QueryRun> e : runs.entrySet()) - e.getValue().state(err, null); + e.getValue().disconnected(err); } /** @@ -1161,6 +1161,17 @@ public class GridReduceQueryExecutor { for (GridMergeTable tbl : tbls) // Fail all merge indexes. tbl.getScanIndex(null).fail(nodeId); } + + void disconnected(CacheException e) { + if (!state.compareAndSet(null, e)) + return; + + while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. + latch.countDown(); + + for (GridMergeTable tbl : tbls) // Fail all merge indexes. + tbl.getScanIndex(null).fail(e); + } } /**