Ignite-24 wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6f18eb54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6f18eb54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6f18eb54 Branch: refs/heads/ignite-99-2 Commit: 6f18eb54ca195006ad936c8cfc9fc45f1c71f1a7 Parents: 8795b0f Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Fri Jan 23 15:27:08 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Fri Jan 23 15:27:08 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 564 ++++++------------- .../managers/communication/GridIoMessage.java | 53 +- .../processors/cache/GridCacheIoManager.java | 22 +- .../preloader/GridDhtPartitionDemandPool.java | 26 +- .../preloader/GridDhtPartitionSupplyPool.java | 10 +- .../query/GridCacheDistributedQueryManager.java | 23 - .../continuous/GridContinuousProcessor.java | 1 - .../kernal/processors/job/GridJobProcessor.java | 9 - .../kernal/processors/job/GridJobWorker.java | 6 - .../processors/task/GridTaskProcessor.java | 15 +- .../communication/GridIoManagerSelfTest.java | 2 +- .../GridCacheSyncReplicatedPreloadSelfTest.java | 1 - 12 files changed, 221 insertions(+), 511 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java index 8aab72f..2d9189d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoManager.java @@ -59,15 +59,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Max closed topics to store. */ public static final int MAX_CLOSED_TOPICS = 10240; - /** Ordered messages comparator. */ - private static final Comparator<IgniteBiTuple<GridIoMessage, Long>> MSG_CMP = - new Comparator<IgniteBiTuple<GridIoMessage, Long>>() { - @Override public int compare(IgniteBiTuple<GridIoMessage, Long> t1, IgniteBiTuple<GridIoMessage, Long> t2) { - return t1.get1().messageId() < t2.get1().messageId() ? -1 : - t1.get1().messageId() == t2.get1().messageId() ? 0 : 1; - } - }; - /** Listeners by topic. */ private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>(); @@ -99,10 +90,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap = new ConcurrentHashMap8<>(); - /** Messages ID generator (per topic). */ - private final ConcurrentMap<Object, ConcurrentMap<UUID, AtomicLong>> msgIdMap = - new ConcurrentHashMap8<>(); - /** Local node ID. */ private final UUID locNodeId; @@ -215,26 +202,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa switch (evt.type()) { case EVT_NODE_JOINED: - ConcurrentLinkedDeque8<DelayedMessage> delayedMsgs = null; - - lock.writeLock().lock(); - - try { - if (started) - delayedMsgs = waitMap.remove(nodeId); - } - finally { - lock.writeLock().unlock(); - } - - if (log.isDebugEnabled()) - log.debug("Processing messages from discovery startup delay list " + - "(sender node joined topology): " + delayedMsgs); - - // After write lock released. - if (delayedMsgs != null) - for (DelayedMessage msg : delayedMsgs) - commLsnr.onMessage(msg.nodeId(), msg.message(), msg.callback()); + assert waitMap.get(nodeId) == null; // We can't receive messages from undiscovered nodes. break; @@ -474,7 +442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa switch (plc) { case P2P_POOL: { - processP2PMessage(node, msg, msgC); + processP2PMessage(nodeId, msg, msgC); break; } @@ -485,9 +453,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case AFFINITY_POOL: case UTILITY_CACHE_POOL: { if (msg.isOrdered()) - processOrderedMessage(node, msg, plc, msgC); + processOrderedMessage(nodeId, msg, plc, msgC); else - processRegularMessage(node, msg, plc, msgC); + processRegularMessage(nodeId, msg, plc, msgC); break; } @@ -534,33 +502,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param msg Message bytes. - * @return Policy. - */ - private GridIoPolicy policy(byte[] msg) { - GridIoPolicy plc = GridIoPolicy.fromOrdinal(msg[0]); - - if (plc == null) - throw new IllegalStateException("Failed to parse message policy: " + Arrays.toString(msg)); - - return plc; - } - - /** - * @param msg Message bytes. - * @return {@code True} if ordered. - */ - private boolean ordered(byte[] msg) { - return msg[1] == 1; - } - - /** - * @param node Node. + * @param nodeId Node ID. * @param msg Message. * @param msgC Closure to call when message processing finished. */ - @SuppressWarnings("deprecation") - private void processP2PMessage(final ClusterNode node, final GridIoMessage msg, final IgniteRunnable msgC) { + private void processP2PMessage( + final UUID nodeId, + final GridIoMessage msg, + final IgniteRunnable msgC + ) { workersCnt.increment(); Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) { @@ -577,7 +527,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert obj != null; - lsnr.onMessage(node.id(), obj); + lsnr.onMessage(nodeId, obj); } finally { threadProcessingMessage(false); @@ -602,13 +552,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param node Node. + * @param nodeId Node ID. * @param msg Message. * @param plc Execution policy. * @param msgC Closure to call when message processing finished. */ - private void processRegularMessage(final ClusterNode node, final GridIoMessage msg, GridIoPolicy plc, - final IgniteRunnable msgC) { + private void processRegularMessage( + final UUID nodeId, + final GridIoMessage msg, + GridIoPolicy plc, + final IgniteRunnable msgC + ) { workersCnt.increment(); Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) { @@ -616,7 +570,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa try { threadProcessingMessage(true); - processRegularMessage0(msg, node.id()); + processRegularMessage0(msg, nodeId); } finally { threadProcessingMessage(false); @@ -659,55 +613,20 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param node Node. + * @param nodeId Node ID. * @param msg Ordered message. * @param plc Execution policy. - * @param msgC Closure to call when message processing finished. + * @param msgC Closure to call when message processing finished ({@code null} for sync processing). */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - private void processOrderedMessage(final ClusterNode node, final GridIoMessage msg, final GridIoPolicy plc, - final IgniteRunnable msgC) { + private void processOrderedMessage( + final UUID nodeId, + final GridIoMessage msg, + final GridIoPolicy plc, + @Nullable final IgniteRunnable msgC + ) { assert msg != null; - workersCnt.increment(); - - Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) { - @Override protected void body() { - try { - threadProcessingMessage(true); - - processOrderedMessage0(msg, plc, node.id()); - } - finally { - threadProcessingMessage(false); - - workersCnt.decrement(); - - msgC.run(); - } - } - }; - - try { - pool(plc).execute(c); - } - catch (RejectedExecutionException e) { - U.error(log, "Failed to process ordered message due to execution rejection. " + - "Increase the upper bound on executor service provided by corresponding " + - "configuration property. Will attempt to process message in the listener " + - "thread instead [msgPlc=" + plc + ']', e); - - c.run(); - } - } - - /** - * @param msg Message. - * @param plc Policy. - * @param nodeId Node ID. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - private void processOrderedMessage0(GridIoMessage msg, GridIoPolicy plc, UUID nodeId) { long timeout = msg.timeout(); boolean skipOnTimeout = msg.skipOnTimeout(); @@ -775,7 +694,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } - if (ctx.discovery().node(nodeId) == null) { + if (isNew && ctx.discovery().node(nodeId) == null) { if (log.isDebugEnabled()) log.debug("Message is ignored as sender has left the grid: " + msg); @@ -798,52 +717,89 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (isNew && set.endTime() != Long.MAX_VALUE) ctx.timeout().addTimeoutObject(set); - GridMessageListener lsnr = lsnrMap.get(msg.topic()); + if (set.reserved()) { + // Set is reserved which means that it is currently processed by worker thread. + msgC.run(); - if (lsnr != null) - unwindMessageSet(set, lsnr, false); - else if (closedTopics.contains(msg.topic())) { - if (log.isDebugEnabled()) - log.debug("Message is ignored as it came for the closed topic: " + msg); + return; + } - assert map != null; + final GridMessageListener lsnr = lsnrMap.get(msg.topic()); + + if (lsnr == null) { + if (closedTopics.contains(msg.topic())) { + if (log.isDebugEnabled()) + log.debug("Message is ignored as it came for the closed topic: " + msg); + + assert map != null; + + msgSetMap.remove(msg.topic(), map); + } + else if (log.isDebugEnabled()) { + // Note that we simply keep messages if listener is not + // registered yet, until one will be registered. + log.debug("Received message for unknown listener (messages will be kept until a " + + "listener is registered): " + msg); + } + + return; + } + + if (msgC == null) { + // Message from local node can be processed in sync manner. + assert locNodeId.equals(nodeId); + + unwindMessageSet(set, lsnr); + + return; + } + + // Set is not reserved and new worker should be submitted. + workersCnt.increment(); + + final GridCommunicationMessageSet msgSet0 = set; - msgSetMap.remove(msg.topic(), map); + Runnable c = new GridWorker(ctx.gridName(), "msg-worker", log) { + @Override protected void body() { + try { + threadProcessingMessage(true); + + unwindMessageSet(msgSet0, lsnr); + } + finally { + threadProcessingMessage(false); + + workersCnt.decrement(); + + msgC.run(); + } + } + }; + + try { + pool(plc).execute(c); } - else if (log.isDebugEnabled()) { - // Note that we simply keep messages if listener is not - // registered yet, until one will be registered. - log.debug("Received message for unknown listener (messages will be kept until a " + - "listener is registered): " + msg); + catch (RejectedExecutionException e) { + U.error(log, "Failed to process ordered message due to execution rejection. " + + "Increase the upper bound on executor service provided by corresponding " + + "configuration property. Will attempt to process message in the listener " + + "thread instead [msgPlc=" + plc + ']', e); + + c.run(); } } /** * @param msgSet Message set to unwind. * @param lsnr Listener to notify. - * @param force Whether to force unwind and drop missing - * ordered messages that are not received yet. */ - @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "deprecation"}) - private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr, boolean force) { + private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageListener lsnr) { // Loop until message set is empty or // another thread owns the reservation. while (true) { if (msgSet.reserve()) { try { - Collection<GridIoMessage> orderedMsgs = msgSet.unwind(force); - - if (!orderedMsgs.isEmpty()) { - for (GridIoMessage msg : orderedMsgs) { - Object obj = msg.message(); - - assert obj != null; - - lsnr.onMessage(msgSet.nodeId(), obj); - } - } - else if (log.isDebugEnabled()) - log.debug("No messages were unwound: " + msgSet); + msgSet.unwind(lsnr); } finally { msgSet.release(); @@ -872,19 +828,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topicOrd GridTopic enumeration ordinal. * @param msg Message to send. * @param plc Type of processing. - * @param msgId Message ID. + * @param ordered Ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. * @throws IgniteCheckedException Thrown in case of any errors. */ - private void send(ClusterNode node, Object topic, int topicOrd, GridTcpCommunicationMessageAdapter msg, - GridIoPolicy plc, long msgId, long timeout, boolean skipOnTimeout) throws IgniteCheckedException { + private void send( + ClusterNode node, + Object topic, + int topicOrd, + GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc, + boolean ordered, + long timeout, + boolean skipOnTimeout + ) throws IgniteCheckedException { assert node != null; assert topic != null; assert msg != null; assert plc != null; - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, msgId, timeout, skipOnTimeout); + GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); if (locNodeId.equals(node.id())) { assert plc != P2P_POOL; @@ -894,8 +858,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (commLsnr == null) throw new IgniteCheckedException("Trying to send message when grid is not fully started."); - if (msgId > 0) - processOrderedMessage0(ioMsg, plc, locNodeId); + if (ordered) + processOrderedMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); } @@ -947,7 +911,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, -1, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false); } /** @@ -959,7 +923,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, -1, 0, false); + send(node, topic, -1, msg, plc, false, 0, false); } /** @@ -971,107 +935,52 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, -1, 0, false); - } - - /** - * @param topic Message topic. - * @param nodeId Node ID. - * @return Next ordered message ID. - */ - public long nextMessageId(Object topic, UUID nodeId) { - ConcurrentMap<UUID, AtomicLong> map = msgIdMap.get(topic); - - if (map == null) { - ConcurrentMap<UUID, AtomicLong> lastMap = msgIdMap.putIfAbsent(topic, - map = new ConcurrentHashMap8<>()); - - if (lastMap != null) - map = lastMap; - } - - AtomicLong msgId = map.get(nodeId); - - if (msgId == null) { - AtomicLong lastMsgId = map.putIfAbsent(nodeId, msgId = new AtomicLong(0)); - - if (lastMsgId != null) - msgId = lastMsgId; - } - - long id = msgId.incrementAndGet(); - - if (log.isDebugEnabled()) - log.debug("Got next message ID [topic=" + topic + ", nodeId=" + nodeId + ", id=" + id + ']'); - - return id; - } - - /** - * @param topic Message topic. - */ - public void removeMessageId(Object topic) { - if (log.isDebugEnabled()) - log.debug("Remove message ID for topic: " + topic); - - msgIdMap.remove(topic); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false); } /** * @param node Destination node. * @param topic Topic to send the message to. - * @param msgId Ordered message ID. * @param msg Message to send. * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendOrderedMessage(ClusterNode node, Object topic, long msgId, GridTcpCommunicationMessageAdapter msg, - GridIoPolicy plc, long timeout, boolean skipOnTimeout) throws IgniteCheckedException { + public void sendOrderedMessage( + ClusterNode node, + Object topic, + GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc, + long timeout, + boolean skipOnTimeout + ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, msgId, timeout, skipOnTimeout); - } - - /** - * @param nodeId Destination node. - * @param topic Topic to send the message to. - * @param msgId Ordered message ID. - * @param msg Message to send. - * @param plc Type of processing. - * @param timeout Timeout to keep a message on receiving queue. - * @param skipOnTimeout Whether message can be skipped on timeout. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - public void sendOrderedMessage(UUID nodeId, Object topic, long msgId, GridTcpCommunicationMessageAdapter msg, - GridIoPolicy plc, long timeout, boolean skipOnTimeout) throws IgniteCheckedException { - assert timeout > 0 || skipOnTimeout; - - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) - throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - - send(node, topic, (byte)-1, msg, plc, msgId, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); } /** * @param nodes Destination nodes. * @param topic Topic to send the message to. - * @param msgId Ordered message ID. * @param msg Message to send. * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendOrderedMessage(Collection<? extends ClusterNode> nodes, Object topic, long msgId, - GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc, long timeout, boolean skipOnTimeout) + public void sendOrderedMessage( + Collection<? extends ClusterNode> nodes, + Object topic, + GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc, + long timeout, + boolean skipOnTimeout + ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(nodes, topic, -1, msg, plc, msgId, timeout, skipOnTimeout); + send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout); } /** @@ -1081,9 +990,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(Collection<? extends ClusterNode> nodes, Object topic, GridTcpCommunicationMessageAdapter msg, - GridIoPolicy plc) throws IgniteCheckedException { - send(nodes, topic, -1, msg, plc, -1, 0, false); + public void send( + Collection<? extends ClusterNode> nodes, + Object topic, + GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc + ) throws IgniteCheckedException { + send(nodes, topic, -1, msg, plc, false, 0, false); } /** @@ -1093,9 +1006,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void send(Collection<? extends ClusterNode> nodes, GridTopic topic, GridTcpCommunicationMessageAdapter msg, - GridIoPolicy plc) throws IgniteCheckedException { - send(nodes, topic, topic.ordinal(), msg, plc, -1, 0, false); + public void send( + Collection<? extends ClusterNode> nodes, + GridTopic topic, + GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc + ) throws IgniteCheckedException { + send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false); } /** @@ -1163,11 +1080,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa dep != null ? dep.userVersion() : null, dep != null ? dep.participants() : null); - if (ordered) { - long msgId = nextMessageId(TOPIC_COMM_USER, locNodeId); - - sendOrderedMessage(nodes, TOPIC_COMM_USER, msgId, ioMsg, PUBLIC_POOL, timeout, true); - } + if (ordered) + sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true); else if (loc) send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL); else { @@ -1219,20 +1133,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param topicOrd Topic ordinal value. * @param msg Message to send. * @param plc Type of processing. - * @param msgId Message ID (for ordered messages) or -1 (for unordered messages). + * @param ordered Ordered flag. * @param timeout Message timeout. * @param skipOnTimeout Whether message can be skipped in timeout. * @throws IgniteCheckedException Thrown in case of any errors. */ - private void send(Collection<? extends ClusterNode> nodes, Object topic, int topicOrd, - GridTcpCommunicationMessageAdapter msg, GridIoPolicy plc, long msgId, long timeout, boolean skipOnTimeout) + private void send( + Collection<? extends ClusterNode> nodes, + Object topic, + int topicOrd, + GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc, + boolean ordered, + long timeout, + boolean skipOnTimeout + ) throws IgniteCheckedException { assert nodes != null; assert topic != null; assert msg != null; assert plc != null; - if (msgId < 0) + if (!ordered) assert F.find(nodes, null, F.localNode(locNodeId)) == null : "Internal GridGain code should never call the method with local node in a node list."; @@ -1247,7 +1169,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa first = false; - send(node, topic, topicOrd, msg0, plc, msgId, timeout, skipOnTimeout); + send(node, topic, topicOrd, msg0, plc, ordered, timeout, skipOnTimeout); } } else if (log.isDebugEnabled()) @@ -1339,7 +1261,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa pool(msgSet.policy()).execute(new GridWorker(ctx.gridName(), "msg-worker", log) { @Override protected void body() { try { - unwindMessageSet(msgSet, lsnrs0, false); + unwindMessageSet(msgSet, lsnrs0); } finally { workersCnt.decrement(); @@ -1356,7 +1278,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa "process message in the listener thread instead.", e); for (GridCommunicationMessageSet msgSet : msgSets) - unwindMessageSet(msgSet, lsnr, false); + unwindMessageSet(msgSet, lsnr); } finally { // Decrement for last runnable submission of which failed. @@ -1535,7 +1457,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa X.println(">>> IO manager memory stats [grid=" + ctx.gridName() + ']'); X.println(">>> lsnrMapSize: " + lsnrMap.size()); X.println(">>> msgSetMapSize: " + msgSetMap.size()); - X.println(">>> msgIdMapSize: " + msgIdMap.size()); X.println(">>> closedTopicsSize: " + closedTopics.sizex()); X.println(">>> discoWaitMapSize: " + waitMap.size()); } @@ -1801,10 +1722,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** */ @GridToStringInclude - private final List<IgniteBiTuple<GridIoMessage, Long>> msgs = new ArrayList<>(); - - /** */ - private long nextMsgId = 1; + private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new ConcurrentLinkedDeque<>(); /** */ private final AtomicBoolean reserved = new AtomicBoolean(); @@ -1818,9 +1736,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** */ private long lastTs; - /** */ - private volatile boolean changed; - /** * @param plc Communication policy. * @param topic Communication topic. @@ -1829,8 +1744,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param skipOnTimeout Whether message can be skipped on timeout. * @param msg Message to add immediately. */ - GridCommunicationMessageSet(GridIoPolicy plc, Object topic, UUID nodeId, long timeout, boolean skipOnTimeout, - GridIoMessage msg) { + GridCommunicationMessageSet( + GridIoPolicy plc, + Object topic, + UUID nodeId, + long timeout, + boolean skipOnTimeout, + GridIoMessage msg + ) { assert nodeId != null; assert topic != null; assert plc != null; @@ -1885,7 +1806,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } if (unwind) - unwindMessageSet(this, lsnr, true); + unwindMessageSet(this, lsnr); else break; } @@ -1945,6 +1866,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * @return {@code True} if set is reserved. + */ + boolean reserved() { + return reserved.get(); + } + + /** * Releases reservation. */ void release() { @@ -1954,100 +1882,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param force Whether to force unwind and drop missing - * ordered messages that are not received yet. - * @return Session request. + * @param lsnr Listener to notify. */ - synchronized Collection<GridIoMessage> unwind(boolean force) { + void unwind(GridMessageListener lsnr) { assert reserved.get(); - changed = false; - - if (msgs.isEmpty()) - return Collections.emptyList(); - - if (msgs.size() == 1) { - IgniteBiTuple<GridIoMessage, Long> t = msgs.get(0); - - GridIoMessage msg = t.get1(); - - if (force || msg.messageId() == nextMsgId) { - if (msg.messageId() != nextMsgId) { - for (long skipped = nextMsgId; skipped < msg.messageId(); skipped++) { - U.warn(log, "Skipped ordered message due to timeout, consider increasing " + - "networkTimeout configuration property [topic=" + topic + ", msgId=" + - skipped + ", timeout=" + timeout + ']'); - } - } - - nextMsgId = msg.messageId() + 1; - - lastTs = t.get2(); - - msgs.clear(); - - return Collections.singleton(msg); - } - - return Collections.emptyList(); - } - - // Sort before unwinding. - Collections.sort(msgs, MSG_CMP); - - Collection<GridIoMessage> orderedMsgs = new LinkedList<>(); - - for (Iterator<IgniteBiTuple<GridIoMessage, Long>> iter = msgs.iterator(); iter.hasNext();) { - IgniteBiTuple<GridIoMessage, Long> t = iter.next(); - - GridIoMessage msg = t.get1(); - - if (force || msg.messageId() == nextMsgId) { - if (msg.messageId() != nextMsgId) { - for (long skipped = nextMsgId; skipped < msg.messageId(); skipped++) { - U.warn(log, "Skipped ordered message due to timeout, consider increasing " + - "networkTimeout configuration property [topic=" + topic + ", msgId=" + - skipped + ", timeout=" + timeout + ']'); - } - } - - force = false; - - orderedMsgs.add(msg); - - nextMsgId = msg.messageId() + 1; - - lastTs = t.get2(); - - iter.remove(); - } - else - break; - } - - return orderedMsgs; + for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != null; t = msgs.poll()) + lsnr.onMessage(nodeId, t.get1().message()); } /** * @param msg Message to add. */ - synchronized void add(GridIoMessage msg) { - if (msg.messageId() >= nextMsgId) { - msgs.add(F.t(msg, U.currentTimeMillis())); - - changed = true; - } - else { - U.warn(log, "Received previously skipped ordered message (will be dropped) [topic=" + topic + - ", msgId=" + msg.messageId() + ", timeout=" + timeout + ']'); - } + void add(GridIoMessage msg) { + msgs.add(F.t(msg, U.currentTimeMillis())); } /** * @return {@code True} if set has messages to unwind. */ boolean changed() { - return changed; + return !msgs.isEmpty(); } /** @@ -2107,45 +1962,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * */ - private static class ConcurrentHashSet0<E> extends GridConcurrentHashSet<E> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private int hash; - - /** - * - */ - private ConcurrentHashSet0() { - super(1, 1, 1); - } - - /** - * @param o Object to be compared for equality with this map. - * @return {@code True} only for {@code this}. - */ - @Override public boolean equals(Object o) { - return o == this; - } - - /** - * @return Identity hash code. - */ - @Override public int hashCode() { - if (hash == 0) { - int hash0 = System.identityHashCode(this); - - hash = hash0 != 0 ? hash0 : -1; - } - - return hash; - } - } - - /** - * - */ private static class DelayedMessage { /** */ private final UUID nodeId; @@ -2156,9 +1972,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** */ private final IgniteRunnable msgC; - /** */ - private final long rcvTime = U.currentTimeMillis(); - /** * @param nodeId Node ID. * @param msg Message. @@ -2171,13 +1984,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @return Receive time. - */ - public long receiveTime() { - return rcvTime; - } - - /** * @return Message char. */ public IgniteRunnable callback() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java index fcc1980..541400e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/communication/GridIoMessage.java @@ -46,8 +46,8 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { /** Topic ordinal. */ private int topicOrd = -1; - /** Message order. */ - private long msgId = -1; + /** Message ordered flag. */ + private boolean ordered; /** Message timeout. */ private long timeout; @@ -71,12 +71,19 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { * @param topic Communication topic. * @param topicOrd Topic ordinal value. * @param msg Message. - * @param msgId Message ID. + * @param ordered Message ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. */ - public GridIoMessage(GridIoPolicy plc, Object topic, int topicOrd, GridTcpCommunicationMessageAdapter msg, - long msgId, long timeout, boolean skipOnTimeout) { + public GridIoMessage( + GridIoPolicy plc, + Object topic, + int topicOrd, + GridTcpCommunicationMessageAdapter msg, + boolean ordered, + long timeout, + boolean skipOnTimeout + ) { assert plc != null; assert topic != null; assert topicOrd <= Byte.MAX_VALUE; @@ -86,7 +93,7 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { this.msg = msg; this.topic = topic; this.topicOrd = topicOrd; - this.msgId = msgId; + this.ordered = ordered; this.timeout = timeout; this.skipOnTimeout = skipOnTimeout; } @@ -141,13 +148,6 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { } /** - * @return Message ID. - */ - long messageId() { - return msgId; - } - - /** * @return Message timeout. */ public long timeout() { @@ -165,30 +165,17 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { * @return {@code True} if message is ordered, {@code false} otherwise. */ boolean isOrdered() { - return msgId > 0; + return ordered; } /** {@inheritDoc} */ @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (!(obj instanceof GridIoMessage)) - return false; - - GridIoMessage other = (GridIoMessage)obj; - - return topic.equals(other.topic) && msgId == other.msgId; + throw new AssertionError(); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = topic.hashCode(); - - res = 31 * res + (int)(msgId ^ (msgId >>> 32)); - res = 31 * res + topic.hashCode(); - - return res; + throw new AssertionError(); } /** {@inheritDoc} */ @@ -210,7 +197,7 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { _clone.topic = topic; _clone.topicBytes = topicBytes; _clone.topicOrd = topicOrd; - _clone.msgId = msgId; + _clone.ordered = ordered; _clone.timeout = timeout; _clone.skipOnTimeout = skipOnTimeout; _clone.msg = msg != null ? (GridTcpCommunicationMessageAdapter)msg.clone() : null; @@ -236,7 +223,7 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { commState.idx++; case 1: - if (!commState.putLong(msgId)) + if (!commState.putBoolean(ordered)) return false; commState.idx++; @@ -293,10 +280,10 @@ public class GridIoMessage extends GridTcpCommunicationMessageAdapter { commState.idx++; case 1: - if (buf.remaining() < 8) + if (buf.remaining() < 1) return false; - msgId = commState.getLong(); + ordered = commState.getBoolean(); commState.idx++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java index 94b2c49..f6b2e68 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java @@ -557,12 +557,11 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V /** * @param node Destination node. * @param topic Topic to send the message to. - * @param msgId Ordered message ID. * @param msg Message to send. * @param timeout Timeout to keep a message on receiving queue. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void sendOrderedMessage(ClusterNode node, Object topic, long msgId, GridCacheMessage<K, V> msg, + public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage<K, V> msg, long timeout) throws IgniteCheckedException { onSend(msg, node.id()); @@ -572,7 +571,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V try { cnt++; - cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, SYSTEM_POOL, timeout, false); + cctx.gridIO().sendOrderedMessage(node, topic, msg, SYSTEM_POOL, timeout, false); if (log.isDebugEnabled()) log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg + @@ -595,15 +594,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V } /** - * @param topic Message topic. - * @param nodeId Node ID. - * @return Next ordered message ID. - */ - public long messageId(Object topic, UUID nodeId) { - return cctx.gridIO().nextMessageId(topic, nodeId); - } - - /** * @return ID that auto-grows based on local counter and counters received * from other nodes. */ @@ -731,7 +721,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V */ public void removeOrderedHandler(Object topic) { if (orderedHandlers.remove(topic) != null) { - cctx.gridIO().removeMessageId(topic); cctx.gridIO().removeMessageListener(topic); if (log != null && log.isDebugEnabled()) @@ -743,13 +732,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V } /** - * @param topic Message topic. - */ - public void removeMessageId(Object topic) { - cctx.gridIO().removeMessageId(topic); - } - - /** * @param nodeId Sender node ID. * @param cacheMsg Message. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java index 7bbf102..7d6c8c0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java @@ -66,9 +66,6 @@ public class GridDhtPartitionDemandPool<K, V> { private final ReadWriteLock busyLock; /** */ - private GridDhtPartitionTopology<K, V> top; - - /** */ @GridToStringInclude private final Collection<DemandWorker> dmdWorkers; @@ -110,8 +107,6 @@ public class GridDhtPartitionDemandPool<K, V> { log = cctx.logger(getClass()); - top = cctx.dht().topology(); - poolSize = cctx.preloadEnabled() ? cctx.config().getPreloadThreadPoolSize() : 0; if (poolSize > 0) { @@ -160,7 +155,6 @@ public class GridDhtPartitionDemandPool<K, V> { if (log.isDebugEnabled()) log.debug("After joining on demand workers: " + dmdWorkers); - top = null; lastExchangeFut = null; lastTimeoutObj.set(null); @@ -264,13 +258,6 @@ public class GridDhtPartitionDemandPool<K, V> { } /** - * @return Dummy node-left message. - */ - private SupplyMessage<K, V> dummyTopology() { - return DUMMY_TOP; - } - - /** * @param msg Message to check. * @return {@code True} if dummy message. */ @@ -332,7 +319,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @return Nodes owning this partition. */ private Collection<ClusterNode> remoteOwners(int p, long topVer) { - return F.view(top.owners(p, topVer), F.remoteNodes(cctx.nodeId())); + return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); } /** @@ -495,7 +482,7 @@ public class GridDhtPartitionDemandPool<K, V> { * @throws GridInterruptedException If interrupted. */ private boolean preloadEntry(ClusterNode pick, int p, GridCacheEntryInfo<K, V> entry, long topVer) - throws IgniteCheckedException, GridInterruptedException { + throws IgniteCheckedException { try { GridCacheEntryEx<K, V> cached = null; @@ -705,7 +692,7 @@ public class GridDhtPartitionDemandPool<K, V> { int p = e.getKey(); if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition<K, V> part = top.localPartition(p, topVer, true); + GridDhtLocalPartition<K, V> part = cctx.dht().topology().localPartition(p, topVer, true); assert part != null; @@ -749,7 +736,7 @@ public class GridDhtPartitionDemandPool<K, V> { if (last) { remaining.remove(p); - top.own(part); + cctx.dht().topology().own(part); if (log.isDebugEnabled()) log.debug("Finished preloading partition: " + part); @@ -981,6 +968,8 @@ public class GridDhtPartitionDemandPool<K, V> { */ GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture<K, V> exchFut) { // No assignments for disabled preloader. + GridDhtPartitionTopology<K, V> top = cctx.dht().topology(); + if (!cctx.preloadEnabled()) return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); @@ -988,7 +977,8 @@ public class GridDhtPartitionDemandPool<K, V> { assert exchFut.forcePreload() || exchFut.dummyReassign() || exchFut.exchangeId().topologyVersion() == top.topologyVersion() : - "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; + "Topology version mismatch [exchId=" + exchFut.exchangeId() + + ", topVer=" + top.topologyVersion() + ']'; GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index beb469a..d8f742b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -495,14 +495,6 @@ class GridDhtPartitionSupplyPool<K, V> { } catch (IgniteCheckedException e) { U.error(log, "Failed to send partition supply message to node: " + node.id(), e); - - // Removing current topic because of request must fail with timeout and - // demander will generate new topic. - cctx.io().removeMessageId(d.topic()); - } - finally { - if (!ack || nodeLeft) - cctx.io().removeMessageId(d.topic()); } } @@ -519,7 +511,7 @@ class GridDhtPartitionSupplyPool<K, V> { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - cctx.io().sendOrderedMessage(n, d.topic(), cctx.io().messageId(d.topic(), n.id()), s, d.timeout()); + cctx.io().sendOrderedMessage(n, d.topic(), s, d.timeout()); return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java index 6546c74..6969002 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -272,7 +272,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage cctx.io().sendOrderedMessage( node, topic, - cctx.io().messageId(topic, nodeId), res, timeout > 0 ? timeout : Long.MAX_VALUE); @@ -322,28 +321,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage return false; } - /** {@inheritDoc} */ - @Override protected void removeQueryResult(@Nullable UUID sndId, long reqId) { - super.removeQueryResult(sndId, reqId); - - if (sndId != null) { - Object topic = topic(sndId, reqId); - - cctx.io().removeMessageId(topic); - } - } - - /** {@inheritDoc} */ - @Override protected void removeFieldsQueryResult(@Nullable UUID sndId, long reqId) { - super.removeFieldsQueryResult(sndId, reqId); - - if (sndId != null) { - Object topic = topic(sndId, reqId); - - cctx.io().removeMessageId(topic); - } - } - /** * Processes cache query response. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java index 6ec86a5..9080b6a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousProcessor.java @@ -1144,7 +1144,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.io().sendOrderedMessage( node, orderedTopic, - ctx.io().nextMessageId(orderedTopic, node.id()), msg, SYSTEM_POOL, 0, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java index efcefbe..c407491 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobProcessor.java @@ -367,7 +367,6 @@ public class GridJobProcessor extends GridProcessorAdapter { ctx.io().sendOrderedMessage( taskNode, topic, // Job topic. - ctx.io().nextMessageId(topic, taskNode.id()), req, SYSTEM_POOL, timeout, @@ -1309,14 +1308,9 @@ public class GridJobProcessor extends GridProcessorAdapter { // Send response to designated job topic. // Always go through communication to preserve order. - long msgId = ctx.io().nextMessageId(topic, sndNode.id()); - - ctx.io().removeMessageId(topic); - ctx.io().sendOrderedMessage( sndNode, topic, - msgId, jobRes, req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL, timeout, @@ -1571,9 +1565,6 @@ public class GridJobProcessor extends GridProcessorAdapter { if (worker.getSession().isFullSupport()) { // Unregister session request listener for this jobs. ctx.io().removeMessageListener(worker.getJobTopic()); - - // Unregister message IDs used for sending. - ctx.io().removeMessageId(worker.getTaskTopic()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java index cb855c8..e1a4e5b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java @@ -660,9 +660,6 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { // We should save message ID here since listener callback will reset sequence. ClusterNode sndNode = ctx.discovery().node(taskNode.id()); - long msgId = sndNode != null && ses.isFullSupport() ? - ctx.io().nextMessageId(taskTopic, sndNode.id()) : -1; - finishTime = U.currentTimeMillis(); Collection<IgniteBiTuple<Integer, String>> evts = null; @@ -726,12 +723,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { // Send response to designated job topic. // Always go through communication to preserve order, // if attributes are enabled. - assert msgId > 0; - ctx.io().sendOrderedMessage( sndNode, taskTopic, - msgId, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java index 6622a09..8e0dab6 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskProcessor.java @@ -773,7 +773,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { return; } - Map<UUID, Long> msgIds = new HashMap<>(siblings.size(), 1.0f); + Set<UUID> rcvrs = new HashSet<>(); UUID locNodeId = ctx.localNodeId(); @@ -794,8 +794,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { UUID nodeId = sib.nodeId(); - if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !msgIds.containsKey(nodeId)) - msgIds.put(nodeId, commMgr.nextMessageId(sib.jobTopic(), nodeId)); + if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !rcvrs.contains(nodeId)) + rcvrs.add(nodeId); } } @@ -821,12 +821,8 @@ public class GridTaskProcessor extends GridProcessorAdapter { UUID nodeId = sib.nodeId(); - Long msgId = msgIds.remove(nodeId); - // Pair can be null if job is finished. - if (msgId != null) { - assert msgId > 0; - + if (rcvrs.remove(nodeId)) { ClusterNode node = ctx.discovery().node(nodeId); // Check that node didn't change (it could happen in case of failover). @@ -845,7 +841,6 @@ public class GridTaskProcessor extends GridProcessorAdapter { commMgr.sendOrderedMessage( node, sib.jobTopic(), - msgId, req, SYSTEM_POOL, timeout, @@ -1051,7 +1046,6 @@ public class GridTaskProcessor extends GridProcessorAdapter { // Remove message ID registration and old listener. if (worker.getSession().isFullSupport()) { - ioMgr.removeMessageId(sib.jobTopic()); ioMgr.removeMessageListener(sib.taskTopic(), msgLsnr); synchronized (worker.getSession()) { @@ -1113,7 +1107,6 @@ public class GridTaskProcessor extends GridProcessorAdapter { for (ComputeJobSibling sibling : worker.getSession().getJobSiblings()) { GridJobSiblingImpl s = (GridJobSiblingImpl)sibling; - ctx.io().removeMessageId(s.jobTopic()); ctx.io().removeMessageListener(s.taskTopic(), msgLsnr); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java index b943060..fa6af66 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/managers/communication/GridIoManagerSelfTest.java @@ -165,7 +165,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { verify(ioMgr).sendOrderedMessage( argThat(new IsEqualCollection(F.asList(locNode, rmtNode))), - eq(GridTopic.TOPIC_COMM_USER), anyLong(), + eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL), eq(123L), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6f18eb54/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java index a56a6de..33c0db2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java @@ -45,7 +45,6 @@ public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTe /** */ private static final boolean DISCO_DEBUG_MODE = false; - /** * Constructs test. */