# gg-9791 - Communication fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d27722d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d27722d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d27722d Branch: refs/heads/ignite-312 Commit: 5d27722d3118201f6dfc7aa7c11f9c32948950a2 Parents: 42dee73 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Feb 18 17:33:21 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Feb 18 17:33:21 2015 -0800 ---------------------------------------------------------------------- .../ignite/internal/direct/DirectByteBufferStream.java | 9 ++++++++- .../apache/ignite/internal/direct/DirectMessageReader.java | 2 +- .../apache/ignite/internal/direct/DirectMessageWriter.java | 2 +- .../internal/managers/communication/GridIoManager.java | 2 +- .../apache/ignite/internal/util/nio/GridDirectParser.java | 2 +- .../plugin/extensions/communication/MessageFormatter.java | 3 ++- .../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 4 ++-- .../org/apache/ignite/testframework/GridSpiTestContext.java | 4 ++-- 8 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index 901958f..d496807 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -210,6 +210,9 @@ public class DirectByteBufferStream { private final MessageFactory msgFactory; /** */ + private final MessageReader msgReader; + + /** */ private ByteBuffer buf; /** */ @@ -265,9 +268,11 @@ public class DirectByteBufferStream { /** * @param msgFactory Message factory. + * @param msgReader Message reader. */ - public DirectByteBufferStream(MessageFactory msgFactory) { + public DirectByteBufferStream(MessageFactory msgFactory, MessageReader msgReader) { this.msgFactory = msgFactory; + this.msgReader = msgReader; } /** @@ -912,6 +917,8 @@ public class DirectByteBufferStream { msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); + msg.setReader(msgReader); + msgTypeDone = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index b616ca8..3e336af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -38,7 +38,7 @@ public class DirectMessageReader implements MessageReader { * @param msgFactory Message factory. */ public DirectMessageReader(MessageFactory msgFactory) { - this.stream = new DirectByteBufferStream(msgFactory); + this.stream = new DirectByteBufferStream(msgFactory, this); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 341039f..1dac3d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -29,7 +29,7 @@ import java.util.*; */ public class DirectMessageWriter implements MessageWriter { /** Stream. */ - private final DirectByteBufferStream stream = new DirectByteBufferStream(null); + private final DirectByteBufferStream stream = new DirectByteBufferStream(null, null); /** State. */ private final DirectMessageWriterState state = new DirectMessageWriterState(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 7f2fdb8..3be34f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -223,7 +223,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return new DirectMessageWriter(); } - @Override public MessageReader reader() { + @Override public MessageReader reader(MessageFactory factory) { return new DirectMessageReader(msgFactory); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 5e21dd3..24146c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -55,7 +55,7 @@ public class GridDirectParser implements GridNioParser { MessageAdapter msg = ses.removeMeta(MSG_META_KEY); if (msg == null && buf.hasRemaining()) { - MessageReader reader = formatter.reader(); + MessageReader reader = formatter.reader(msgFactory); reader.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java index 796b317..6176561 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java @@ -40,7 +40,8 @@ public interface MessageFormatter extends Extension { /** * Creates new message reader instance. * + * @param factory Message factory. * @return Message reader. */ - public MessageReader reader(); + public MessageReader reader(MessageFactory factory); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 959d094..230ef6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1494,13 +1494,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return impl.writer(); } - @Override public MessageReader reader() { + @Override public MessageReader reader(MessageFactory factory) { if (impl == null) impl = getSpiContext().messageFormatter(); assert impl != null; - return impl.reader(); + return impl.reader(factory); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d27722d/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index d8321b3..e1aaa71 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -519,8 +519,8 @@ public class GridSpiTestContext implements IgniteSpiContext { return new DirectMessageWriter(); } - @Override public MessageReader reader() { - return new DirectMessageReader(messageFactory()); + @Override public MessageReader reader(MessageFactory factory) { + return new DirectMessageReader(factory); } }; }