Ignite-24 wip

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6f18eb54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6f18eb54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6f18eb54

Branch: refs/heads/ignite-99-2
Commit: 6f18eb54ca195006ad936c8cfc9fc45f1c71f1a7
Parents: 8795b0f
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Fri Jan 23 15:27:08 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Fri Jan 23 15:27:08 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 564 ++++++-------------
 .../managers/communication/GridIoMessage.java   |  53 +-
 .../processors/cache/GridCacheIoManager.java    |  22 +-
 .../preloader/GridDhtPartitionDemandPool.java   |  26 +-
 .../preloader/GridDhtPartitionSupplyPool.java   |  10 +-
 .../query/GridCacheDistributedQueryManager.java |  23 -
 .../continuous/GridContinuousProcessor.java     |   1 -
 .../kernal/processors/job/GridJobProcessor.java |   9 -
 .../kernal/processors/job/GridJobWorker.java    |   6 -
 .../processors/task/GridTaskProcessor.java      |  15 +-
 .../communication/GridIoManagerSelfTest.java    |   2 +-
 .../GridCacheSyncReplicatedPreloadSelfTest.java |   1 -
 12 files changed, 221 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java
index 8aab72f..2d9189d 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java
@@ -59,15 +59,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     /** Max closed topics to store. */
     public static final int MAX_CLOSED_TOPICS = 10240;
 
-    /** Ordered messages comparator. */
-    private static final Comparator<IgniteBiTuple<GridIoMessage, Long>> 
MSG_CMP =
-        new Comparator<IgniteBiTuple<GridIoMessage, Long>>() {
-            @Override public int compare(IgniteBiTuple<GridIoMessage, Long> 
t1, IgniteBiTuple<GridIoMessage, Long> t2) {
-                return t1.get1().messageId() < t2.get1().messageId() ? -1 :
-                    t1.get1().messageId() == t2.get1().messageId() ? 0 : 1;
-            }
-        };
-
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new 
ConcurrentHashMap8<>();
 
@@ -99,10 +90,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     private final ConcurrentMap<Object, ConcurrentMap<UUID, 
GridCommunicationMessageSet>> msgSetMap =
         new ConcurrentHashMap8<>();
 
-    /** Messages ID generator (per topic). */
-    private final ConcurrentMap<Object, ConcurrentMap<UUID, AtomicLong>> 
msgIdMap =
-        new ConcurrentHashMap8<>();
-
     /** Local node ID. */
     private final UUID locNodeId;
 
@@ -215,26 +202,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
                 switch (evt.type()) {
                     case EVT_NODE_JOINED:
-                        ConcurrentLinkedDeque8<DelayedMessage> delayedMsgs = 
null;
-
-                        lock.writeLock().lock();
-
-                        try {
-                            if (started)
-                                delayedMsgs = waitMap.remove(nodeId);
-                        }
-                        finally {
-                            lock.writeLock().unlock();
-                        }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Processing messages from discovery 
startup delay list " +
-                                "(sender node joined topology): " + 
delayedMsgs);
-
-                        // After write lock released.
-                        if (delayedMsgs != null)
-                            for (DelayedMessage msg : delayedMsgs)
-                                commLsnr.onMessage(msg.nodeId(), 
msg.message(), msg.callback());
+                        assert waitMap.get(nodeId) == null; // We can't 
receive messages from undiscovered nodes.
 
                         break;
 
@@ -474,7 +442,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
             switch (plc) {
                 case P2P_POOL: {
-                    processP2PMessage(node, msg, msgC);
+                    processP2PMessage(nodeId, msg, msgC);
 
                     break;
                 }
@@ -485,9 +453,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 case AFFINITY_POOL:
                 case UTILITY_CACHE_POOL: {
                     if (msg.isOrdered())
-                        processOrderedMessage(node, msg, plc, msgC);
+                        processOrderedMessage(nodeId, msg, plc, msgC);
                     else
-                        processRegularMessage(node, msg, plc, msgC);
+                        processRegularMessage(nodeId, msg, plc, msgC);
 
                     break;
                 }
@@ -534,33 +502,15 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param msg Message bytes.
-     * @return Policy.
-     */
-    private GridIoPolicy policy(byte[] msg) {
-        GridIoPolicy plc = GridIoPolicy.fromOrdinal(msg[0]);
-
-        if (plc == null)
-            throw new IllegalStateException("Failed to parse message policy: " 
+ Arrays.toString(msg));
-
-        return plc;
-    }
-
-    /**
-     * @param msg Message bytes.
-     * @return {@code True} if ordered.
-     */
-    private boolean ordered(byte[] msg) {
-        return msg[1] == 1;
-    }
-
-    /**
-     * @param node Node.
+     * @param nodeId Node ID.
      * @param msg Message.
      * @param msgC Closure to call when message processing finished.
      */
-    @SuppressWarnings("deprecation")
-    private void processP2PMessage(final ClusterNode node, final GridIoMessage 
msg, final IgniteRunnable msgC) {
+    private void processP2PMessage(
+        final UUID nodeId,
+        final GridIoMessage msg,
+        final IgniteRunnable msgC
+    ) {
         workersCnt.increment();
 
         Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
@@ -577,7 +527,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
                     assert obj != null;
 
-                    lsnr.onMessage(node.id(), obj);
+                    lsnr.onMessage(nodeId, obj);
                 }
                 finally {
                     threadProcessingMessage(false);
@@ -602,13 +552,17 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param node Node.
+     * @param nodeId Node ID.
      * @param msg Message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished.
      */
-    private void processRegularMessage(final ClusterNode node, final 
GridIoMessage msg, GridIoPolicy plc,
-        final IgniteRunnable msgC) {
+    private void processRegularMessage(
+        final UUID nodeId,
+        final GridIoMessage msg,
+        GridIoPolicy plc,
+        final IgniteRunnable msgC
+    ) {
         workersCnt.increment();
 
         Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
@@ -616,7 +570,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 try {
                     threadProcessingMessage(true);
 
-                    processRegularMessage0(msg, node.id());
+                    processRegularMessage0(msg, nodeId);
                 }
                 finally {
                     threadProcessingMessage(false);
@@ -659,55 +613,20 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param node Node.
+     * @param nodeId Node ID.
      * @param msg Ordered message.
      * @param plc Execution policy.
-     * @param msgC Closure to call when message processing finished.
+     * @param msgC Closure to call when message processing finished ({@code 
null} for sync processing).
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    private void processOrderedMessage(final ClusterNode node, final 
GridIoMessage msg, final GridIoPolicy plc,
-        final IgniteRunnable msgC) {
+    private void processOrderedMessage(
+        final UUID nodeId,
+        final GridIoMessage msg,
+        final GridIoPolicy plc,
+        @Nullable final IgniteRunnable msgC
+    ) {
         assert msg != null;
 
-        workersCnt.increment();
-
-        Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
-            @Override protected void body() {
-                try {
-                    threadProcessingMessage(true);
-
-                    processOrderedMessage0(msg, plc, node.id());
-                }
-                finally {
-                    threadProcessingMessage(false);
-
-                    workersCnt.decrement();
-
-                    msgC.run();
-                }
-            }
-        };
-
-        try {
-            pool(plc).execute(c);
-        }
-        catch (RejectedExecutionException e) {
-            U.error(log, "Failed to process ordered message due to execution 
rejection. " +
-                "Increase the upper bound on executor service provided by 
corresponding " +
-                "configuration property. Will attempt to process message in 
the listener " +
-                "thread instead [msgPlc=" + plc + ']', e);
-
-            c.run();
-        }
-    }
-
-    /**
-     * @param msg Message.
-     * @param plc Policy.
-     * @param nodeId Node ID.
-     */
-    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    private void processOrderedMessage0(GridIoMessage msg, GridIoPolicy plc, 
UUID nodeId) {
         long timeout = msg.timeout();
         boolean skipOnTimeout = msg.skipOnTimeout();
 
@@ -775,7 +694,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
 
-        if (ctx.discovery().node(nodeId) == null) {
+        if (isNew && ctx.discovery().node(nodeId) == null) {
             if (log.isDebugEnabled())
                 log.debug("Message is ignored as sender has left the grid: " + 
msg);
 
@@ -798,52 +717,89 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (isNew && set.endTime() != Long.MAX_VALUE)
             ctx.timeout().addTimeoutObject(set);
 
-        GridMessageListener lsnr = lsnrMap.get(msg.topic());
+        if (set.reserved()) {
+            // Set is reserved which means that it is currently processed by 
worker thread.
+            msgC.run();
 
-        if (lsnr != null)
-            unwindMessageSet(set, lsnr, false);
-        else if (closedTopics.contains(msg.topic())) {
-            if (log.isDebugEnabled())
-                log.debug("Message is ignored as it came for the closed topic: 
" + msg);
+            return;
+        }
 
-            assert map != null;
+        final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+        if (lsnr == null) {
+            if (closedTopics.contains(msg.topic())) {
+                if (log.isDebugEnabled())
+                    log.debug("Message is ignored as it came for the closed 
topic: " + msg);
+
+                assert map != null;
+
+                msgSetMap.remove(msg.topic(), map);
+            }
+            else if (log.isDebugEnabled()) {
+                // Note that we simply keep messages if listener is not
+                // registered yet, until one will be registered.
+                log.debug("Received message for unknown listener (messages 
will be kept until a " +
+                    "listener is registered): " + msg);
+            }
+
+            return;
+        }
+
+        if (msgC == null) {
+            // Message from local node can be processed in sync manner.
+            assert locNodeId.equals(nodeId);
+
+            unwindMessageSet(set, lsnr);
+
+            return;
+        }
+
+        // Set is not reserved and new worker should be submitted.
+        workersCnt.increment();
+
+        final GridCommunicationMessageSet msgSet0 = set;
 
-            msgSetMap.remove(msg.topic(), map);
+        Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) {
+            @Override protected void body() {
+                try {
+                    threadProcessingMessage(true);
+
+                    unwindMessageSet(msgSet0, lsnr);
+                }
+                finally {
+                    threadProcessingMessage(false);
+
+                    workersCnt.decrement();
+
+                    msgC.run();
+                }
+            }
+        };
+
+        try {
+            pool(plc).execute(c);
         }
-        else if (log.isDebugEnabled()) {
-            // Note that we simply keep messages if listener is not
-            // registered yet, until one will be registered.
-            log.debug("Received message for unknown listener (messages will be 
kept until a " +
-                "listener is registered): " + msg);
+        catch (RejectedExecutionException e) {
+            U.error(log, "Failed to process ordered message due to execution 
rejection. " +
+                "Increase the upper bound on executor service provided by 
corresponding " +
+                "configuration property. Will attempt to process message in 
the listener " +
+                "thread instead [msgPlc=" + plc + ']', e);
+
+            c.run();
         }
     }
 
     /**
      * @param msgSet Message set to unwind.
      * @param lsnr Listener to notify.
-     * @param force Whether to force unwind and drop missing
-     *      ordered messages that are not received yet.
      */
-    @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", 
"deprecation"})
-    private void unwindMessageSet(GridCommunicationMessageSet msgSet, 
GridMessageListener lsnr, boolean force) {
+    private void unwindMessageSet(GridCommunicationMessageSet msgSet, 
GridMessageListener lsnr) {
         // Loop until message set is empty or
         // another thread owns the reservation.
         while (true) {
             if (msgSet.reserve()) {
                 try {
-                    Collection<GridIoMessage> orderedMsgs = 
msgSet.unwind(force);
-
-                    if (!orderedMsgs.isEmpty()) {
-                        for (GridIoMessage msg : orderedMsgs) {
-                            Object obj = msg.message();
-
-                            assert obj != null;
-
-                            lsnr.onMessage(msgSet.nodeId(), obj);
-                        }
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("No messages were unwound: " + msgSet);
+                    msgSet.unwind(lsnr);
                 }
                 finally {
                     msgSet.release();
@@ -872,19 +828,27 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param topicOrd GridTopic enumeration ordinal.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param msgId Message ID.
+     * @param ordered Ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private void send(ClusterNode node, Object topic, int topicOrd, 
GridTcpCommunicationMessageAdapter msg,
-        GridIoPolicy plc, long msgId, long timeout, boolean skipOnTimeout) 
throws IgniteCheckedException {
+    private void send(
+        ClusterNode node,
+        Object topic,
+        int topicOrd,
+        GridTcpCommunicationMessageAdapter msg,
+        GridIoPolicy plc,
+        boolean ordered,
+        long timeout,
+        boolean skipOnTimeout
+    ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
         assert msg != null;
         assert plc != null;
 
-        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, 
msgId, timeout, skipOnTimeout);
+        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, 
ordered, timeout, skipOnTimeout);
 
         if (locNodeId.equals(node.id())) {
             assert plc != P2P_POOL;
@@ -894,8 +858,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             if (commLsnr == null)
                 throw new IgniteCheckedException("Trying to send message when 
grid is not fully started.");
 
-            if (msgId > 0)
-                processOrderedMessage0(ioMsg, plc, locNodeId);
+            if (ordered)
+                processOrderedMessage(locNodeId, ioMsg, plc, null);
             else
                 processRegularMessage0(ioMsg, locNodeId);
         }
@@ -947,7 +911,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, -1, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
     }
 
     /**
@@ -959,7 +923,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, 
GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, -1, 0, false);
+        send(node, topic, -1, msg, plc, false, 0, false);
     }
 
     /**
@@ -971,107 +935,52 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, 
GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, -1, 0, false);
-    }
-
-    /**
-     * @param topic Message topic.
-     * @param nodeId Node ID.
-     * @return Next ordered message ID.
-     */
-    public long nextMessageId(Object topic, UUID nodeId) {
-        ConcurrentMap<UUID, AtomicLong> map = msgIdMap.get(topic);
-
-        if (map == null) {
-            ConcurrentMap<UUID, AtomicLong> lastMap = 
msgIdMap.putIfAbsent(topic,
-                map = new ConcurrentHashMap8<>());
-
-            if (lastMap != null)
-                map = lastMap;
-        }
-
-        AtomicLong msgId = map.get(nodeId);
-
-        if (msgId == null) {
-            AtomicLong lastMsgId = map.putIfAbsent(nodeId, msgId = new 
AtomicLong(0));
-
-            if (lastMsgId != null)
-                msgId = lastMsgId;
-        }
-
-        long id = msgId.incrementAndGet();
-
-        if (log.isDebugEnabled())
-            log.debug("Got next message ID [topic=" + topic + ", nodeId=" + 
nodeId + ", id=" + id + ']');
-
-        return id;
-    }
-
-    /**
-     * @param topic Message topic.
-     */
-    public void removeMessageId(Object topic) {
-        if (log.isDebugEnabled())
-            log.debug("Remove message ID for topic: " + topic);
-
-        msgIdMap.remove(topic);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
     }
 
     /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
-     * @param msgId Ordered message ID.
      * @param msg Message to send.
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendOrderedMessage(ClusterNode node, Object topic, long msgId, 
GridTcpCommunicationMessageAdapter msg,
-        GridIoPolicy plc, long timeout, boolean skipOnTimeout) throws 
IgniteCheckedException {
+    public void sendOrderedMessage(
+        ClusterNode node,
+        Object topic,
+        GridTcpCommunicationMessageAdapter msg,
+        GridIoPolicy plc,
+        long timeout,
+        boolean skipOnTimeout
+    ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, msgId, timeout, skipOnTimeout);
-    }
-
-    /**
-     * @param nodeId Destination node.
-     * @param topic Topic to send the message to.
-     * @param msgId Ordered message ID.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param timeout Timeout to keep a message on receiving queue.
-     * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    public void sendOrderedMessage(UUID nodeId, Object topic, long msgId, 
GridTcpCommunicationMessageAdapter msg,
-        GridIoPolicy plc, long timeout, boolean skipOnTimeout) throws 
IgniteCheckedException {
-        assert timeout > 0 || skipOnTimeout;
-
-        ClusterNode node = ctx.discovery().node(nodeId);
-
-        if (node == null)
-            throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
-
-        send(node, topic, (byte)-1, msg, plc, msgId, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
     }
 
     /**
      * @param nodes Destination nodes.
      * @param topic Topic to send the message to.
-     * @param msgId Ordered message ID.
      * @param msg Message to send.
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendOrderedMessage(Collection<? extends ClusterNode> nodes, 
Object topic, long msgId,
-        GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc, long 
timeout, boolean skipOnTimeout)
+    public void sendOrderedMessage(
+        Collection<? extends ClusterNode> nodes,
+        Object topic,
+        GridTcpCommunicationMessageAdapter msg,
+        GridIoPolicy plc,
+        long timeout,
+        boolean skipOnTimeout
+    )
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, -1, msg, plc, msgId, timeout, skipOnTimeout);
+        send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
     }
 
     /**
@@ -1081,9 +990,13 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(Collection<? extends ClusterNode> nodes, Object topic, 
GridTcpCommunicationMessageAdapter msg,
-        GridIoPolicy plc) throws IgniteCheckedException {
-        send(nodes, topic, -1, msg, plc, -1, 0, false);
+    public void send(
+        Collection<? extends ClusterNode> nodes,
+        Object topic,
+        GridTcpCommunicationMessageAdapter msg,
+        GridIoPolicy plc
+    ) throws IgniteCheckedException {
+        send(nodes, topic, -1, msg, plc, false, 0, false);
     }
 
     /**
@@ -1093,9 +1006,13 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(Collection<? extends ClusterNode> nodes, GridTopic topic, 
GridTcpCommunicationMessageAdapter msg,
-        GridIoPolicy plc) throws IgniteCheckedException {
-        send(nodes, topic, topic.ordinal(), msg, plc, -1, 0, false);
+    public void send(
+        Collection<? extends ClusterNode> nodes,
+        GridTopic topic,
+        GridTcpCommunicationMessageAdapter msg,
+        GridIoPolicy plc
+    ) throws IgniteCheckedException {
+        send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
     }
 
     /**
@@ -1163,11 +1080,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             dep != null ? dep.userVersion() : null,
             dep != null ? dep.participants() : null);
 
-        if (ordered) {
-            long msgId = nextMessageId(TOPIC_COMM_USER, locNodeId);
-
-            sendOrderedMessage(nodes, TOPIC_COMM_USER, msgId, ioMsg, 
PUBLIC_POOL, timeout, true);
-        }
+        if (ordered)
+            sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, 
timeout, true);
         else if (loc)
             send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
         else {
@@ -1219,20 +1133,28 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param topicOrd Topic ordinal value.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param msgId Message ID (for ordered messages) or -1 (for unordered 
messages).
+     * @param ordered Ordered flag.
      * @param timeout Message timeout.
      * @param skipOnTimeout Whether message can be skipped in timeout.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private void send(Collection<? extends ClusterNode> nodes, Object topic, 
int topicOrd,
-        GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc, long msgId, 
long timeout, boolean skipOnTimeout)
+    private void send(
+        Collection<? extends ClusterNode> nodes,
+        Object topic,
+        int topicOrd,
+        GridTcpCommunicationMessageAdapter msg,
+        GridIoPolicy plc,
+        boolean ordered,
+        long timeout,
+        boolean skipOnTimeout
+    )
         throws IgniteCheckedException {
         assert nodes != null;
         assert topic != null;
         assert msg != null;
         assert plc != null;
 
-        if (msgId < 0)
+        if (!ordered)
             assert F.find(nodes, null, F.localNode(locNodeId)) == null :
                 "Internal GridGain code should never call the method with 
local node in a node list.";
 
@@ -1247,7 +1169,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
                     first = false;
 
-                    send(node, topic, topicOrd, msg0, plc, msgId, timeout, 
skipOnTimeout);
+                    send(node, topic, topicOrd, msg0, plc, ordered, timeout, 
skipOnTimeout);
                 }
             }
             else if (log.isDebugEnabled())
@@ -1339,7 +1261,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                     pool(msgSet.policy()).execute(new 
GridWorker(ctx.gridName(), "msg-worker", log) {
                         @Override protected void body() {
                             try {
-                                unwindMessageSet(msgSet, lsnrs0, false);
+                                unwindMessageSet(msgSet, lsnrs0);
                             }
                             finally {
                                 workersCnt.decrement();
@@ -1356,7 +1278,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                     "process message in the listener thread instead.", e);
 
                 for (GridCommunicationMessageSet msgSet : msgSets)
-                    unwindMessageSet(msgSet, lsnr, false);
+                    unwindMessageSet(msgSet, lsnr);
             }
             finally {
                 // Decrement for last runnable submission of which failed.
@@ -1535,7 +1457,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         X.println(">>> IO manager memory stats [grid=" + ctx.gridName() + ']');
         X.println(">>>  lsnrMapSize: " + lsnrMap.size());
         X.println(">>>  msgSetMapSize: " + msgSetMap.size());
-        X.println(">>>  msgIdMapSize: " + msgIdMap.size());
         X.println(">>>  closedTopicsSize: " + closedTopics.sizex());
         X.println(">>>  discoWaitMapSize: " + waitMap.size());
     }
@@ -1801,10 +1722,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         /** */
         @GridToStringInclude
-        private final List<IgniteBiTuple<GridIoMessage, Long>> msgs = new 
ArrayList<>();
-
-        /** */
-        private long nextMsgId = 1;
+        private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new 
ConcurrentLinkedDeque<>();
 
         /** */
         private final AtomicBoolean reserved = new AtomicBoolean();
@@ -1818,9 +1736,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         /** */
         private long lastTs;
 
-        /** */
-        private volatile boolean changed;
-
         /**
          * @param plc Communication policy.
          * @param topic Communication topic.
@@ -1829,8 +1744,14 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
          * @param skipOnTimeout Whether message can be skipped on timeout.
          * @param msg Message to add immediately.
          */
-        GridCommunicationMessageSet(GridIoPolicy plc, Object topic, UUID 
nodeId, long timeout, boolean skipOnTimeout,
-            GridIoMessage msg) {
+        GridCommunicationMessageSet(
+            GridIoPolicy plc,
+            Object topic,
+            UUID nodeId,
+            long timeout,
+            boolean skipOnTimeout,
+            GridIoMessage msg
+        ) {
             assert nodeId != null;
             assert topic != null;
             assert plc != null;
@@ -1885,7 +1806,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                         }
 
                         if (unwind)
-                            unwindMessageSet(this, lsnr, true);
+                            unwindMessageSet(this, lsnr);
                         else
                             break;
                     }
@@ -1945,6 +1866,13 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         }
 
         /**
+         * @return {@code True} if set is reserved.
+         */
+        boolean reserved() {
+            return reserved.get();
+        }
+
+        /**
          * Releases reservation.
          */
         void release() {
@@ -1954,100 +1882,27 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         }
 
         /**
-         * @param force Whether to force unwind and drop missing
-         *      ordered messages that are not received yet.
-         * @return Session request.
+         * @param lsnr Listener to notify.
          */
-        synchronized Collection<GridIoMessage> unwind(boolean force) {
+        void unwind(GridMessageListener lsnr) {
             assert reserved.get();
 
-            changed = false;
-
-            if (msgs.isEmpty())
-                return Collections.emptyList();
-
-            if (msgs.size() == 1) {
-                IgniteBiTuple<GridIoMessage, Long> t = msgs.get(0);
-
-                GridIoMessage msg = t.get1();
-
-                if (force || msg.messageId() == nextMsgId) {
-                    if (msg.messageId() != nextMsgId) {
-                        for (long skipped = nextMsgId; skipped < 
msg.messageId(); skipped++) {
-                            U.warn(log, "Skipped ordered message due to 
timeout, consider increasing " +
-                                "networkTimeout configuration property 
[topic=" + topic + ", msgId=" +
-                                skipped + ", timeout=" + timeout + ']');
-                        }
-                    }
-
-                    nextMsgId = msg.messageId() + 1;
-
-                    lastTs = t.get2();
-
-                    msgs.clear();
-
-                    return Collections.singleton(msg);
-                }
-
-                return Collections.emptyList();
-            }
-
-            // Sort before unwinding.
-            Collections.sort(msgs, MSG_CMP);
-
-            Collection<GridIoMessage> orderedMsgs = new LinkedList<>();
-
-            for (Iterator<IgniteBiTuple<GridIoMessage, Long>> iter = 
msgs.iterator(); iter.hasNext();) {
-                IgniteBiTuple<GridIoMessage, Long> t = iter.next();
-
-                GridIoMessage msg = t.get1();
-
-                if (force || msg.messageId() == nextMsgId) {
-                    if (msg.messageId() != nextMsgId) {
-                        for (long skipped = nextMsgId; skipped < 
msg.messageId(); skipped++) {
-                            U.warn(log, "Skipped ordered message due to 
timeout, consider increasing " +
-                                "networkTimeout configuration property 
[topic=" + topic + ", msgId=" +
-                                skipped + ", timeout=" + timeout + ']');
-                        }
-                    }
-
-                    force = false;
-
-                    orderedMsgs.add(msg);
-
-                    nextMsgId = msg.messageId() + 1;
-
-                    lastTs = t.get2();
-
-                    iter.remove();
-                }
-                else
-                    break;
-            }
-
-            return orderedMsgs;
+            for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != 
null; t = msgs.poll())
+                lsnr.onMessage(nodeId, t.get1().message());
         }
 
         /**
          * @param msg Message to add.
          */
-        synchronized void add(GridIoMessage msg) {
-            if (msg.messageId() >= nextMsgId) {
-                msgs.add(F.t(msg, U.currentTimeMillis()));
-
-                changed = true;
-            }
-            else {
-                U.warn(log, "Received previously skipped ordered message (will 
be dropped) [topic=" + topic +
-                    ", msgId=" + msg.messageId() + ", timeout=" + timeout + 
']');
-            }
+        void add(GridIoMessage msg) {
+            msgs.add(F.t(msg, U.currentTimeMillis()));
         }
 
         /**
          * @return {@code True} if set has messages to unwind.
          */
         boolean changed() {
-            return changed;
+            return !msgs.isEmpty();
         }
 
         /**
@@ -2107,45 +1962,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     /**
      *
      */
-    private static class ConcurrentHashSet0<E> extends 
GridConcurrentHashSet<E> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private int hash;
-
-        /**
-         *
-         */
-        private ConcurrentHashSet0() {
-            super(1, 1, 1);
-        }
-
-        /**
-         * @param o Object to be compared for equality with this map.
-         * @return {@code True} only for {@code this}.
-         */
-        @Override public boolean equals(Object o) {
-            return o == this;
-        }
-
-        /**
-         * @return Identity hash code.
-         */
-        @Override public int hashCode() {
-            if (hash == 0) {
-                int hash0 = System.identityHashCode(this);
-
-                hash = hash0 != 0 ? hash0 : -1;
-            }
-
-            return hash;
-        }
-    }
-
-    /**
-     *
-     */
     private static class DelayedMessage {
         /** */
         private final UUID nodeId;
@@ -2156,9 +1972,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         /** */
         private final IgniteRunnable msgC;
 
-        /** */
-        private final long rcvTime = U.currentTimeMillis();
-
         /**
          * @param nodeId Node ID.
          * @param msg Message.
@@ -2171,13 +1984,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         }
 
         /**
-         * @return Receive time.
-         */
-        public long receiveTime() {
-            return rcvTime;
-        }
-
-        /**
          * @return Message char.
          */
         public IgniteRunnable callback() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java
index fcc1980..541400e 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java
@@ -46,8 +46,8 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
     /** Topic ordinal. */
     private int topicOrd = -1;
 
-    /** Message order. */
-    private long msgId = -1;
+    /** Message ordered flag. */
+    private boolean ordered;
 
     /** Message timeout. */
     private long timeout;
@@ -71,12 +71,19 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
      * @param topic Communication topic.
      * @param topicOrd Topic ordinal value.
      * @param msg Message.
-     * @param msgId Message ID.
+     * @param ordered Message ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
      */
-    public GridIoMessage(GridIoPolicy plc, Object topic, int topicOrd, 
GridTcpCommunicationMessageAdapter msg,
-        long msgId, long timeout, boolean skipOnTimeout) {
+    public GridIoMessage(
+        GridIoPolicy plc,
+        Object topic,
+        int topicOrd,
+        GridTcpCommunicationMessageAdapter msg,
+        boolean ordered,
+        long timeout,
+        boolean skipOnTimeout
+    ) {
         assert plc != null;
         assert topic != null;
         assert topicOrd <= Byte.MAX_VALUE;
@@ -86,7 +93,7 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
         this.msg = msg;
         this.topic = topic;
         this.topicOrd = topicOrd;
-        this.msgId = msgId;
+        this.ordered = ordered;
         this.timeout = timeout;
         this.skipOnTimeout = skipOnTimeout;
     }
@@ -141,13 +148,6 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
     }
 
     /**
-     * @return Message ID.
-     */
-    long messageId() {
-        return msgId;
-    }
-
-    /**
      * @return Message timeout.
      */
     public long timeout() {
@@ -165,30 +165,17 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
      * @return {@code True} if message is ordered, {@code false} otherwise.
      */
     boolean isOrdered() {
-        return msgId > 0;
+        return ordered;
     }
 
     /** {@inheritDoc} */
     @Override public boolean equals(Object obj) {
-        if (obj == this)
-            return true;
-
-        if (!(obj instanceof GridIoMessage))
-            return false;
-
-        GridIoMessage other = (GridIoMessage)obj;
-
-        return topic.equals(other.topic) && msgId == other.msgId;
+        throw new AssertionError();
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        int res = topic.hashCode();
-
-        res = 31 * res + (int)(msgId ^ (msgId >>> 32));
-        res = 31 * res + topic.hashCode();
-
-        return res;
+        throw new AssertionError();
     }
 
     /** {@inheritDoc} */
@@ -210,7 +197,7 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
         _clone.topic = topic;
         _clone.topicBytes = topicBytes;
         _clone.topicOrd = topicOrd;
-        _clone.msgId = msgId;
+        _clone.ordered = ordered;
         _clone.timeout = timeout;
         _clone.skipOnTimeout = skipOnTimeout;
         _clone.msg = msg != null ? 
(GridTcpCommunicationMessageAdapter)msg.clone() : null;
@@ -236,7 +223,7 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
                 commState.idx++;
 
             case 1:
-                if (!commState.putLong(msgId))
+                if (!commState.putBoolean(ordered))
                     return false;
 
                 commState.idx++;
@@ -293,10 +280,10 @@ public class GridIoMessage extends 
GridTcpCommunicationMessageAdapter {
                 commState.idx++;
 
             case 1:
-                if (buf.remaining() < 8)
+                if (buf.remaining() < 1)
                     return false;
 
-                msgId = commState.getLong();
+                ordered = commState.getBoolean();
 
                 commState.idx++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
index 94b2c49..f6b2e68 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
@@ -557,12 +557,11 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
     /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
-     * @param msgId Ordered message ID.
      * @param msg Message to send.
      * @param timeout Timeout to keep a message on receiving queue.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void sendOrderedMessage(ClusterNode node, Object topic, long msgId, 
GridCacheMessage<K, V> msg,
+    public void sendOrderedMessage(ClusterNode node, Object topic, 
GridCacheMessage<K, V> msg,
         long timeout) throws IgniteCheckedException {
         onSend(msg, node.id());
 
@@ -572,7 +571,7 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
             try {
                 cnt++;
 
-                cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, 
SYSTEM_POOL, timeout, false);
+                cctx.gridIO().sendOrderedMessage(node, topic, msg, 
SYSTEM_POOL, timeout, false);
 
                 if (log.isDebugEnabled())
                     log.debug("Sent ordered cache message [topic=" + topic + 
", msg=" + msg +
@@ -595,15 +594,6 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
     }
 
     /**
-     * @param topic Message topic.
-     * @param nodeId Node ID.
-     * @return Next ordered message ID.
-     */
-    public long messageId(Object topic, UUID nodeId) {
-        return cctx.gridIO().nextMessageId(topic, nodeId);
-    }
-
-    /**
      * @return ID that auto-grows based on local counter and counters received
      *      from other nodes.
      */
@@ -731,7 +721,6 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
      */
     public void removeOrderedHandler(Object topic) {
         if (orderedHandlers.remove(topic) != null) {
-            cctx.gridIO().removeMessageId(topic);
             cctx.gridIO().removeMessageListener(topic);
 
             if (log != null && log.isDebugEnabled())
@@ -743,13 +732,6 @@ public class GridCacheIoManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V
     }
 
     /**
-     * @param topic Message topic.
-     */
-    public void removeMessageId(Object topic) {
-        cctx.gridIO().removeMessageId(topic);
-    }
-
-    /**
      * @param nodeId Sender node ID.
      * @param cacheMsg Message.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 7bbf102..7d6c8c0 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -66,9 +66,6 @@ public class GridDhtPartitionDemandPool<K, V> {
     private final ReadWriteLock busyLock;
 
     /** */
-    private GridDhtPartitionTopology<K, V> top;
-
-    /** */
     @GridToStringInclude
     private final Collection<DemandWorker> dmdWorkers;
 
@@ -110,8 +107,6 @@ public class GridDhtPartitionDemandPool<K, V> {
 
         log = cctx.logger(getClass());
 
-        top = cctx.dht().topology();
-
         poolSize = cctx.preloadEnabled() ? 
cctx.config().getPreloadThreadPoolSize() : 0;
 
         if (poolSize > 0) {
@@ -160,7 +155,6 @@ public class GridDhtPartitionDemandPool<K, V> {
         if (log.isDebugEnabled())
             log.debug("After joining on demand workers: " + dmdWorkers);
 
-        top = null;
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
@@ -264,13 +258,6 @@ public class GridDhtPartitionDemandPool<K, V> {
     }
 
     /**
-     * @return Dummy node-left message.
-     */
-    private SupplyMessage<K, V> dummyTopology() {
-        return DUMMY_TOP;
-    }
-
-    /**
      * @param msg Message to check.
      * @return {@code True} if dummy message.
      */
@@ -332,7 +319,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @return Nodes owning this partition.
      */
     private Collection<ClusterNode> remoteOwners(int p, long topVer) {
-        return F.view(top.owners(p, topVer), F.remoteNodes(cctx.nodeId()));
+        return F.view(cctx.dht().topology().owners(p, topVer), 
F.remoteNodes(cctx.nodeId()));
     }
 
     /**
@@ -495,7 +482,7 @@ public class GridDhtPartitionDemandPool<K, V> {
          * @throws GridInterruptedException If interrupted.
          */
         private boolean preloadEntry(ClusterNode pick, int p, 
GridCacheEntryInfo<K, V> entry, long topVer)
-            throws IgniteCheckedException, GridInterruptedException {
+            throws IgniteCheckedException {
             try {
                 GridCacheEntryEx<K, V> cached = null;
 
@@ -705,7 +692,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                             int p = e.getKey();
 
                             if (cctx.affinity().localNode(p, topVer)) {
-                                GridDhtLocalPartition<K, V> part = 
top.localPartition(p, topVer, true);
+                                GridDhtLocalPartition<K, V> part = 
cctx.dht().topology().localPartition(p, topVer, true);
 
                                 assert part != null;
 
@@ -749,7 +736,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                                         if (last) {
                                             remaining.remove(p);
 
-                                            top.own(part);
+                                            cctx.dht().topology().own(part);
 
                                             if (log.isDebugEnabled())
                                                 log.debug("Finished preloading 
partition: " + part);
@@ -981,6 +968,8 @@ public class GridDhtPartitionDemandPool<K, V> {
      */
     GridDhtPreloaderAssignments<K, V> 
assign(GridDhtPartitionsExchangeFuture<K, V> exchFut) {
         // No assignments for disabled preloader.
+        GridDhtPartitionTopology<K, V> top = cctx.dht().topology();
+
         if (!cctx.preloadEnabled())
             return new GridDhtPreloaderAssignments<>(exchFut, 
top.topologyVersion());
 
@@ -988,7 +977,8 @@ public class GridDhtPartitionDemandPool<K, V> {
 
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
             exchFut.exchangeId().topologyVersion() == top.topologyVersion() :
-            "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", 
topVer=" + top.topologyVersion() + ']';
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments<K, V> assigns = new 
GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index beb469a..d8f742b 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -495,14 +495,6 @@ class GridDhtPartitionSupplyPool<K, V> {
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to send partition supply message to node: 
" + node.id(), e);
-
-                // Removing current topic because of request must fail with 
timeout and
-                // demander will generate new topic.
-                cctx.io().removeMessageId(d.topic());
-            }
-            finally {
-                if (!ack || nodeLeft)
-                    cctx.io().removeMessageId(d.topic());
             }
         }
 
@@ -519,7 +511,7 @@ class GridDhtPartitionSupplyPool<K, V> {
                 if (log.isDebugEnabled())
                     log.debug("Replying to partition demand [node=" + n.id() + 
", demand=" + d + ", supply=" + s + ']');
 
-                cctx.io().sendOrderedMessage(n, d.topic(), 
cctx.io().messageId(d.topic(), n.id()), s, d.timeout());
+                cctx.io().sendOrderedMessage(n, d.topic(), s, d.timeout());
 
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java
index 6546c74..6969002 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -272,7 +272,6 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 cctx.io().sendOrderedMessage(
                     node,
                     topic,
-                    cctx.io().messageId(topic, nodeId),
                     res,
                     timeout > 0 ? timeout : Long.MAX_VALUE);
 
@@ -322,28 +321,6 @@ public class GridCacheDistributedQueryManager<K, V> 
extends GridCacheQueryManage
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override protected void removeQueryResult(@Nullable UUID sndId, long 
reqId) {
-        super.removeQueryResult(sndId, reqId);
-
-        if (sndId != null) {
-            Object topic = topic(sndId, reqId);
-
-            cctx.io().removeMessageId(topic);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void removeFieldsQueryResult(@Nullable UUID sndId, 
long reqId) {
-        super.removeFieldsQueryResult(sndId, reqId);
-
-        if (sndId != null) {
-            Object topic = topic(sndId, reqId);
-
-            cctx.io().removeMessageId(topic);
-        }
-    }
-
     /**
      * Processes cache query response.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
index 6ec86a5..9080b6a 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java
@@ -1144,7 +1144,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                         ctx.io().sendOrderedMessage(
                             node,
                             orderedTopic,
-                            ctx.io().nextMessageId(orderedTopic, node.id()),
                             msg,
                             SYSTEM_POOL,
                             0,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java
index efcefbe..c407491 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java
@@ -367,7 +367,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
         ctx.io().sendOrderedMessage(
             taskNode,
             topic, // Job topic.
-            ctx.io().nextMessageId(topic, taskNode.id()),
             req,
             SYSTEM_POOL,
             timeout,
@@ -1309,14 +1308,9 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
 
                 // Send response to designated job topic.
                 // Always go through communication to preserve order.
-                long msgId = ctx.io().nextMessageId(topic, sndNode.id());
-
-                ctx.io().removeMessageId(topic);
-
                 ctx.io().sendOrderedMessage(
                     sndNode,
                     topic,
-                    msgId,
                     jobRes,
                     req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL,
                     timeout,
@@ -1571,9 +1565,6 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
             if (worker.getSession().isFullSupport()) {
                 // Unregister session request listener for this jobs.
                 ctx.io().removeMessageListener(worker.getJobTopic());
-
-                // Unregister message IDs used for sending.
-                ctx.io().removeMessageId(worker.getTaskTopic());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
index cb855c8..e1a4e5b 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
@@ -660,9 +660,6 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
         // We should save message ID here since listener callback will reset 
sequence.
         ClusterNode sndNode = ctx.discovery().node(taskNode.id());
 
-        long msgId = sndNode != null && ses.isFullSupport() ?
-            ctx.io().nextMessageId(taskTopic, sndNode.id()) : -1;
-
         finishTime = U.currentTimeMillis();
 
         Collection<IgniteBiTuple<Integer, String>> evts = null;
@@ -726,12 +723,9 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                                 // Send response to designated job topic.
                                 // Always go through communication to preserve 
order,
                                 // if attributes are enabled.
-                                assert msgId > 0;
-
                                 ctx.io().sendOrderedMessage(
                                     sndNode,
                                     taskTopic,
-                                    msgId,
                                     jobRes,
                                     internal ? MANAGEMENT_POOL : SYSTEM_POOL,
                                     timeout,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java
index 6622a09..8e0dab6 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java
@@ -773,7 +773,7 @@ public class GridTaskProcessor extends GridProcessorAdapter 
{
             return;
         }
 
-        Map<UUID, Long> msgIds = new HashMap<>(siblings.size(), 1.0f);
+        Set<UUID> rcvrs = new HashSet<>();
 
         UUID locNodeId = ctx.localNodeId();
 
@@ -794,8 +794,8 @@ public class GridTaskProcessor extends GridProcessorAdapter 
{
 
                 UUID nodeId = sib.nodeId();
 
-                if (!nodeId.equals(locNodeId) && !sib.isJobDone() && 
!msgIds.containsKey(nodeId))
-                    msgIds.put(nodeId, commMgr.nextMessageId(sib.jobTopic(), 
nodeId));
+                if (!nodeId.equals(locNodeId) && !sib.isJobDone() && 
!rcvrs.contains(nodeId))
+                    rcvrs.add(nodeId);
             }
         }
 
@@ -821,12 +821,8 @@ public class GridTaskProcessor extends 
GridProcessorAdapter {
 
             UUID nodeId = sib.nodeId();
 
-            Long msgId = msgIds.remove(nodeId);
-
             // Pair can be null if job is finished.
-            if (msgId != null) {
-                assert msgId > 0;
-
+            if (rcvrs.remove(nodeId)) {
                 ClusterNode node = ctx.discovery().node(nodeId);
 
                 // Check that node didn't change (it could happen in case of 
failover).
@@ -845,7 +841,6 @@ public class GridTaskProcessor extends GridProcessorAdapter 
{
                         commMgr.sendOrderedMessage(
                             node,
                             sib.jobTopic(),
-                            msgId,
                             req,
                             SYSTEM_POOL,
                             timeout,
@@ -1051,7 +1046,6 @@ public class GridTaskProcessor extends 
GridProcessorAdapter {
 
             // Remove message ID registration and old listener.
             if (worker.getSession().isFullSupport()) {
-                ioMgr.removeMessageId(sib.jobTopic());
                 ioMgr.removeMessageListener(sib.taskTopic(), msgLsnr);
 
                 synchronized (worker.getSession()) {
@@ -1113,7 +1107,6 @@ public class GridTaskProcessor extends 
GridProcessorAdapter {
                     for (ComputeJobSibling sibling : 
worker.getSession().getJobSiblings()) {
                         GridJobSiblingImpl s = (GridJobSiblingImpl)sibling;
 
-                        ctx.io().removeMessageId(s.jobTopic());
                         ctx.io().removeMessageListener(s.taskTopic(), msgLsnr);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java
index b943060..fa6af66 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java
@@ -165,7 +165,7 @@ public class GridIoManagerSelfTest extends 
GridCommonAbstractTest {
 
         verify(ioMgr).sendOrderedMessage(
             argThat(new IsEqualCollection(F.asList(locNode, rmtNode))),
-            eq(GridTopic.TOPIC_COMM_USER), anyLong(),
+            eq(GridTopic.TOPIC_COMM_USER),
             any(GridIoUserMessage.class),
             eq(GridIoPolicy.PUBLIC_POOL),
             eq(123L),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
index a56a6de..33c0db2 100644
--- 
a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
+++ 
b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
@@ -45,7 +45,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends 
GridCommonAbstractTe
     /** */
     private static final boolean DISCO_DEBUG_MODE = false;
 
-
     /**
      * Constructs test.
      */

Reply via email to