IGNITE-61 - Direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f62ee2d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f62ee2d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f62ee2d3 Branch: refs/heads/ignite-82 Commit: f62ee2d36b23667b9f8cae289e991ce190b9d0c7 Parents: 3b721e3 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 8 16:32:20 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 8 16:32:20 2015 -0800 ---------------------------------------------------------------------- .../internal/managers/GridManagerAdapter.java | 4 ++++ .../managers/communication/GridIoManager.java | 14 ++++++++++-- .../ignite/internal/util/nio/GridNioServer.java | 23 ++++++++++++++++++++ .../communication/MessageAdapter.java | 8 +++---- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 5 +++++ .../org/apache/ignite/spi/IgniteSpiContext.java | 8 +++++++ .../communication/tcp/TcpCommunicationSpi.java | 1 + .../testframework/GridSpiTestContext.java | 8 +++++++ 8 files changed, 64 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 58b84b2..3e85199 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -547,6 +547,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } } + @Override public MessageWriterFactory messageWriterFactory() { + return ctx.io().messageWriterFactory(); + } + @Override public GridTcpMessageFactory messageFactory() { return ctx.io().messageFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index abb83bf..2d7b67f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -222,6 +222,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; } + /** + * @return Message factory. + */ public GridTcpMessageFactory messageFactory() { assert msgFactory != null; @@ -229,6 +232,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * @return Message writer factory. + */ + public MessageWriterFactory messageWriterFactory() { + assert writerFactory != null; + + return writerFactory; + } + + /** * Resets metrics for this manager. */ public void resetMetrics() { @@ -978,8 +990,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa else { GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg.clone(), ordered, timeout, skipOnTimeout); - ioMsg.setWriter(writerFactory.writer()); - if (topicOrd < 0) ioMsg.topicBytes(marsh.marshal(topic)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 9ec25f6..42a540b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -137,6 +137,9 @@ public class GridNioServer<T> { /** */ private GridNioSslFilter sslFilter; + /** */ + private MessageWriterFactory messageWriterFactory; + /** Static initializer ensures single-threaded execution of workaround. */ static { // This is a workaround for JDK bug (NPE in Selector.open()). @@ -183,6 +186,7 @@ public class GridNioServer<T> { boolean directMode, boolean daemon, GridNioMetricsListener metricsLsnr, + MessageWriterFactory messageWriterFactory, GridNioFilter... filters ) throws IgniteCheckedException { A.notNull(addr, "addr"); @@ -247,6 +251,7 @@ public class GridNioServer<T> { this.directMode = directMode; this.metricsLsnr = metricsLsnr; + this.messageWriterFactory = messageWriterFactory; } /** @@ -1013,6 +1018,8 @@ public class GridNioServer<T> { assert msg != null; + msg.setWriter(messageWriterFactory.writer()); + finished = msg.writeTo(buf); } @@ -1032,6 +1039,8 @@ public class GridNioServer<T> { assert msg != null; + msg.setWriter(messageWriterFactory.writer()); + finished = msg.writeTo(buf); } @@ -2058,6 +2067,9 @@ public class GridNioServer<T> { /** Daemon flag. */ private boolean daemon; + /** Message writer factory. */ + private MessageWriterFactory messageWriterFactory; + /** * Finishes building the instance. * @@ -2081,6 +2093,7 @@ public class GridNioServer<T> { directMode, daemon, metricsLsnr, + messageWriterFactory, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS ); @@ -2273,5 +2286,15 @@ public class GridNioServer<T> { return this; } + + /** + * @param messageWriterFactory Message writer factory. + * @return This for chaining. + */ + public Builder<T> messageWriterFactory(MessageWriterFactory messageWriterFactory) { + this.messageWriterFactory = messageWriterFactory; + + return this; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java index c2b4d96..d393e16 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java @@ -17,8 +17,6 @@ package org.apache.ignite.plugin.extensions.communication; -import org.apache.ignite.internal.direct.*; - import java.io.*; import java.nio.*; @@ -27,7 +25,7 @@ import java.nio.*; */ public abstract class MessageAdapter implements Serializable, Cloneable { /** Writer. */ - protected final MessageWriter writer = new DirectMessageWriter(); + protected MessageWriter writer; /** Reader. */ protected MessageReader reader; @@ -42,8 +40,8 @@ public abstract class MessageAdapter implements Serializable, Cloneable { * @param writer Writer. */ public final void setWriter(MessageWriter writer) { -// if (this.writer == null) -// this.writer = writer; + if (this.writer == null) + this.writer = writer; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 21f75a8..c3b4700 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.direct.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.authentication.*; import org.apache.ignite.internal.managers.communication.*; @@ -724,6 +725,10 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement return null; } + @Override public MessageWriterFactory messageWriterFactory() { + return null; + } + /** {@inheritDoc} */ @Override public GridTcpMessageFactory messageFactory() { return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 06004c5..1f68226 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.direct.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.spi.swapspace.*; import org.jetbrains.annotations.*; @@ -398,6 +399,13 @@ public interface IgniteSpiContext { @Nullable ClassLoader ldr) throws IgniteException; /** + * Gets message writer factory. + * + * @return Message writer factory. + */ + public MessageWriterFactory messageWriterFactory(); + + /** * @return Message factory. */ public GridTcpMessageFactory messageFactory(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4db4ef8..dacd0d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1489,6 +1489,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .writeTimeout(sockWriteTimeout) .filters(new GridNioCodecFilter(new GridDirectParser(this), log, true), new GridConnectionBytesVerifyFilter(log)) + .messageWriterFactory(getSpiContext().messageWriterFactory()) .build(); boundTcpPort = port; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 48d9e2d..76fd302 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -516,6 +516,14 @@ public class GridSpiTestContext implements IgniteSpiContext { return null; } + @Override public MessageWriterFactory messageWriterFactory() { + return new MessageWriterFactory() { + @Override public MessageWriter writer() { + return new DirectMessageWriter(); + } + }; + } + /** {@inheritDoc} */ @Override public GridTcpMessageFactory messageFactory() { return new GridTcpMessageFactory() {