Merge branches 'ignite-104' and 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104
Conflicts:
modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cdd2440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cdd2440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cdd2440
Branch: refs/heads/ignite-426
Commit: 5cdd2440a6b9eb3c5fe0a7620202caf5cb2db441
Parents: 6c1655f 1c10ade
Author: Valentin Kulichenko <[email protected]>
Authored: Fri Jul 31 13:38:39 2015 -0700
Committer: Valentin Kulichenko <[email protected]>
Committed: Fri Jul 31 13:38:39 2015 -0700
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 14 +-
.../managers/communication/GridIoManager.java | 110 ++++-
.../GridDhtPartitionsExchangeFuture.java | 20 +-
.../handlers/query/QueryCommandHandler.java | 6 +-
.../util/nio/GridCommunicationClient.java | 5 +-
.../util/nio/GridNioFinishedFuture.java | 12 +
.../ignite/internal/util/nio/GridNioFuture.java | 14 +
.../internal/util/nio/GridNioFutureImpl.java | 15 +
.../util/nio/GridNioRecoveryDescriptor.java | 13 +-
.../ignite/internal/util/nio/GridNioServer.java | 5 +
.../util/nio/GridNioSessionMetaKey.java | 5 +-
.../util/nio/GridShmemCommunicationClient.java | 7 +-
.../util/nio/GridTcpNioCommunicationClient.java | 14 +-
.../communication/tcp/TcpCommunicationSpi.java | 84 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 45 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++
.../tcp/TcpDiscoveryMultiThreadedTest.java | 8 +-
.../IgniteSpiCommunicationSelfTestSuite.java | 1 +
.../http/jetty/GridJettyRestHandler.java | 12 +-
20 files changed, 779 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cdd2440/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 765ba65,7e17efc..479d116
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1059,9 -982,9 +1061,10 @@@ public class GridIoManager extends Grid
Message msg,
byte plc,
boolean ordered,
+ boolean seq,
long timeout,
- boolean skipOnTimeout
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
@@@ -1079,10 -1002,11 +1082,13 @@@
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else if (seq)
+ processSequentialMessage(locNodeId, ioMsg, plc, null);
else
processRegularMessage0(ioMsg, locNodeId);
+
+ if (ackClosure != null)
+ ackClosure.apply(null);
}
else {
if (topicOrd < 0)
@@@ -1132,7 -1059,7 +1141,7 @@@
if (node == null)
throw new IgniteCheckedException("Failed to send message to node
(has node left grid?): " + nodeId);
- send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false,
null);
}
/**
@@@ -1144,7 -1071,7 +1153,7 @@@
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, false, 0, false);
- send(node, topic, -1, msg, plc, false, 0, false, null);
++ send(node, topic, -1, msg, plc, false, false, 0, false, null);
}
/**
@@@ -1156,7 -1083,7 +1165,7 @@@
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false,
null);
}
/**
@@@ -1178,7 -1105,7 +1187,7 @@@
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, false, timeout,
skipOnTimeout);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout,
null);
++ send(node, topic, (byte)-1, msg, plc, true, false, timeout,
skipOnTimeout, null);
}
/**
@@@ -1205,7 -1132,7 +1214,7 @@@
if (node == null)
throw new IgniteCheckedException("Failed to send message to node
(has node left grid?): " + nodeId);
- send(node, topic, (byte)-1, msg, plc, true, false, timeout,
skipOnTimeout);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout,
null);
++ send(node, topic, (byte)-1, msg, plc, true, false, timeout,
skipOnTimeout, null);
}
/**
@@@ -1264,47 -1217,30 +1299,71 @@@
}
/**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param ackClosure Ack closure.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ ClusterNode node,
+ Object topic,
+ Message msg,
+ byte plc,
+ long timeout,
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
+ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout,
ackClosure);
+ }
+
+ /**
+ * Sends sequential message.
+ *
+ * @param nodeId Destination node ID.
+ * @param topic Topic.
+ * @param msg Message.
+ * @param plc Policy.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void sendSequentialMessage(
+ UUID nodeId,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node
(has node left grid?): " + nodeId);
+
+ sendSequentialMessage(node, topic, msg, plc);
+ }
+
+ /**
+ * Sends sequential message.
+ *
+ * @param node Destination node.
+ * @param topic Topic.
+ * @param msg Message.
+ * @param plc Policy.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void sendSequentialMessage(
+ ClusterNode node,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, true, 0, false);
++ send(node, topic, -1, msg, plc, false, true, 0, false, null);
+ }
+
+ /**
* Sends a peer deployable user message.
*
* @param nodes Destination nodes.
@@@ -1459,7 -1422,7 +1547,7 @@@
// messages to one node vs. many.
if (!nodes.isEmpty()) {
for (ClusterNode node : nodes)
- send(node, topic, topicOrd, msg, plc, ordered, seq,
timeout, skipOnTimeout);
- send(node, topic, topicOrd, msg, plc, ordered, timeout,
skipOnTimeout, null);
++ send(node, topic, topicOrd, msg, plc, ordered, seq,
timeout, skipOnTimeout, null);
}
else if (log.isDebugEnabled())
log.debug("Failed to send message to empty nodes collection
[topic=" + topic + ", msg=" +