IGNITE-1068: Fixing ordered messages back pressure control.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c7e74874 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c7e74874 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c7e74874 Branch: refs/heads/ignite-950 Commit: c7e7487458b72ba8b666fba7b9e24004d425719f Parents: bccba30 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Jul 1 11:50:08 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Jul 1 11:50:08 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 49 +++++++++++--------- .../util/nio/GridNioMessageTracker.java | 23 +++++++-- 2 files changed, 48 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7e74874/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 4382731..d8dcc2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -718,7 +719,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa map = msgSetMap.get(msg.topic()); if (map == null) { - set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg); + set = new GridCommunicationMessageSet(plc, msg.topic(), nodeId, timeout, skipOnTimeout, msg, msgC); map = new ConcurrentHashMap0<>(); @@ -748,7 +749,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (set == null) { GridCommunicationMessageSet old = map.putIfAbsent(nodeId, set = new GridCommunicationMessageSet(plc, msg.topic(), - nodeId, timeout, skipOnTimeout, msg)); + nodeId, timeout, skipOnTimeout, msg, msgC)); assert old == null; @@ -766,7 +767,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert set != null; assert !isNew; - set.add(msg); + set.add(msg, msgC); break; } @@ -795,14 +796,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (isNew && set.endTime() != Long.MAX_VALUE) ctx.timeout().addTimeoutObject(set); - if (set.reserved()) { - // Set is reserved which means that it is currently processed by worker thread. - if (msgC != null) - msgC.run(); - - return; - } - final GridMessageListener lsnr = lsnrMap.get(msg.topic()); if (lsnr == null) { @@ -821,7 +814,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa "listener is registered): " + msg); } - // Mark the message as processed. + // Mark the message as processed, otherwise reading from the connection + // may stop. if (msgC != null) msgC.run(); @@ -848,8 +842,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } finally { threadProcessingMessage(false); - - msgC.run(); } } }; @@ -1852,7 +1844,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** */ @GridToStringInclude - private final Queue<IgniteBiTuple<GridIoMessage, Long>> msgs = new ConcurrentLinkedDeque<>(); + private final Queue<GridTuple3<GridIoMessage, Long, IgniteRunnable>> msgs = new ConcurrentLinkedDeque<>(); /** */ private final AtomicBoolean reserved = new AtomicBoolean(); @@ -1873,6 +1865,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. * @param msg Message to add immediately. + * @param msgC Message closure (may be {@code null}). */ GridCommunicationMessageSet( GridIoPolicy plc, @@ -1880,7 +1873,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa UUID nodeId, long timeout, boolean skipOnTimeout, - GridIoMessage msg + GridIoMessage msg, + @Nullable IgniteRunnable msgC ) { assert nodeId != null; assert topic != null; @@ -1899,7 +1893,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa lastTs = U.currentTimeMillis(); - msgs.add(F.t(msg, lastTs)); + msgs.add(F.t(msg, lastTs, msgC)); } /** {@inheritDoc} */ @@ -2017,15 +2011,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa void unwind(GridMessageListener lsnr) { assert reserved.get(); - for (IgniteBiTuple<GridIoMessage, Long> t = msgs.poll(); t != null; t = msgs.poll()) - lsnr.onMessage(nodeId, t.get1().message()); + for (GridTuple3<GridIoMessage, Long, IgniteRunnable> t = msgs.poll(); t != null; t = msgs.poll()) { + try { + lsnr.onMessage( + nodeId, + t.get1().message()); + } + finally { + if (t.get3() != null) + t.get3().run(); + } + } } /** * @param msg Message to add. + * @param msgC Message closure (may be {@code null}). */ - void add(GridIoMessage msg) { - msgs.add(F.t(msg, U.currentTimeMillis())); + void add( + GridIoMessage msg, + @Nullable IgniteRunnable msgC + ) { + msgs.add(F.t(msg, U.currentTimeMillis(), msgC)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7e74874/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java index 52b7fed..c9ed1a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java @@ -56,9 +56,26 @@ public class GridNioMessageTracker implements IgniteRunnable { /** {@inheritDoc} */ @Override public void run() { - int cnt = msgCnt.decrementAndGet(); + // In case of ordered messages this may be called twice for 1 message. + // Example: message arrives, but listener has not been installed yet. + // Message set is created, but message does not get actually processed. + // If this is not called, connection may be paused which causes hang. + // It seems acceptable to have the following logic accounting the aforementioned. + int cnt = 0; - assert cnt >= 0 : "Invalid count: " + cnt; + for (;;) { + int cur = msgCnt.get(); + + if (cur == 0) + break; + + cnt = cur - 1; + + if (msgCnt.compareAndSet(cur, cnt)) + break; + } + + assert cnt >= 0 : "Invalid count [cnt=" + cnt + ", this=" + this + ']'; if (cnt < msgQueueLimit && paused && lock.tryLock()) { try { @@ -116,6 +133,6 @@ public class GridNioMessageTracker implements IgniteRunnable { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridNioMessageTracker.class, this, super.toString()); + return S.toString(GridNioMessageTracker.class, this, "hash", System.identityHashCode(this)); } }