IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2421fee9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2421fee9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2421fee9 Branch: refs/heads/ignite-104 Commit: 2421fee936c1cfefa072c00da8ebce6897c2156f Parents: d26184a Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Aug 4 10:28:24 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Aug 4 10:28:24 2015 -0700 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 75 +++++++++++--------- 1 file changed, 43 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2421fee9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index a129cbe..b38106e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -980,10 +980,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param msgC Closure to call when message processing finished. */ private void processSequentialMessage( - final UUID nodeId, - final GridIoMessage msg, + UUID nodeId, + GridIoMessage msg, byte plc, - final IgniteRunnable msgC + IgniteRunnable msgC ) throws IgniteCheckedException { final GridMessageListener lsnr = lsnrMap.get(msg.topic()); @@ -1018,39 +1018,41 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa msgSet.add(nodeId, msg, msgC); - if (msgC == null) { - assert locNodeId.equals(nodeId); + if (!msgSet.reserved()) { + if (msgC == null) { + assert locNodeId.equals(nodeId); - msgSet.unwind(lsnr); - } - else { - assert !locNodeId.equals(nodeId); + msgSet.unwind(lsnr); + } + else { + assert !locNodeId.equals(nodeId); - final SequentialMessageSet msgSet0 = msgSet; + final SequentialMessageSet msgSet0 = msgSet; - Runnable c = new Runnable() { - @Override public void run() { - try { - threadProcessingMessage(true); + Runnable c = new Runnable() { + @Override public void run() { + try { + threadProcessingMessage(true); - msgSet0.unwind(lsnr); - } - finally { - threadProcessingMessage(false); + msgSet0.unwind(lsnr); + } + finally { + threadProcessingMessage(false); + } } - } - }; + }; - try { - pool(plc).execute(c); - } - catch (RejectedExecutionException e) { - U.error(log, "Failed to process sequential message due to execution rejection. " + - "Increase the upper bound on executor service provided by corresponding " + - "configuration property. Will attempt to process message in the listener " + - "thread instead [msgPlc=" + plc + ']', e); + try { + pool(plc).execute(c); + } + catch (RejectedExecutionException e) { + U.error(log, "Failed to process sequential message due to execution rejection. " + + "Increase the upper bound on executor service provided by corresponding " + + "configuration property. Will attempt to process message in the listener " + + "thread instead [msgPlc=" + plc + ']', e); - c.run(); + c.run(); + } } } } @@ -1108,10 +1110,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ioMsg.topicBytes(marsh.marshal(topic)); try { - if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) - ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure); + CommunicationSpi spi = getSpi(); + + if (spi instanceof TcpCommunicationSpi) + ((TcpCommunicationSpi)spi).sendMessage(node, ioMsg, ackClosure); else - getSpi().sendMessage(node, ioMsg); + spi.sendMessage(node, ioMsg); } catch (IgniteSpiException e) { throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + @@ -2535,6 +2539,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final AtomicBoolean reserve = new AtomicBoolean(); /** + * @return {@code True} if currently reserved. + */ + boolean reserved() { + return reserve.get(); + } + + /** * @param nodeId Node ID. * @param msg Message. * @param msgC Closure to call when message processing finished.