IGNITE-61 - Direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/81ffe1c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/81ffe1c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/81ffe1c9 Branch: refs/heads/ignite-211 Commit: 81ffe1c9b701a828961a666553dc747a2f0cc344 Parents: 8c4d06f Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 8 18:26:25 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 8 18:26:25 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 26 ++++++++++++++------ .../internal/direct/DirectMessageWriter.java | 6 ++--- .../ignite/internal/util/nio/GridNioServer.java | 26 ++++++++++++++------ .../communication/tcp/TcpCommunicationSpi.java | 2 +- 4 files changed, 41 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index 1b8b5ef..7f802a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -555,7 +555,7 @@ public class DirectByteBufferStream { writeByte(Byte.MIN_VALUE); } - public <T> void writeObjectArray(T[] arr, Class<T> itemCls) { + public <T> void writeObjectArray(T[] arr, Class<T> itemCls, MessageWriter writer) { if (arr != null) { if (it == null) { writeInt(arr.length); @@ -572,8 +572,11 @@ public class DirectByteBufferStream { if (cur == NULL) { cur = it.next(); - if (cur != null && itemType == Type.MSG) + if (cur != null && itemType == Type.MSG) { cur = ((MessageAdapter)cur).clone(); + + ((MessageAdapter)cur).setWriter(writer); + } } write(itemType, cur); @@ -590,7 +593,7 @@ public class DirectByteBufferStream { writeInt(-1); } - public <T> void writeCollection(Collection<T> col, Class<T> itemCls) { + public <T> void writeCollection(Collection<T> col, Class<T> itemCls, MessageWriter writer) { if (col != null) { if (it == null) { writeInt(col.size()); @@ -607,8 +610,11 @@ public class DirectByteBufferStream { if (cur == NULL) { cur = it.next(); - if (cur != null && itemType == Type.MSG) + if (cur != null && itemType == Type.MSG) { cur = ((MessageAdapter)cur).clone(); + + ((MessageAdapter)cur).setWriter(writer); + } } write(itemType, cur); @@ -626,7 +632,7 @@ public class DirectByteBufferStream { } @SuppressWarnings("unchecked") - public <K, V> void writeMap(Map<K, V> map, Class<K> keyCls, Class<V> valCls) { + public <K, V> void writeMap(Map<K, V> map, Class<K> keyCls, Class<V> valCls, MessageWriter writer) { if (map != null) { if (it == null) { writeInt(map.size()); @@ -652,12 +658,18 @@ public class DirectByteBufferStream { K k = e.getKey(); V v = e.getValue(); - if (k != null && keyType == Type.MSG) + if (k != null && keyType == Type.MSG) { k = (K)((MessageAdapter)k).clone(); - if (v != null && valType == Type.MSG) + ((MessageAdapter)k).setWriter(writer); + } + + if (v != null && valType == Type.MSG) { v = (V)((MessageAdapter)v).clone(); + ((MessageAdapter)v).setWriter(writer); + } + cur = e = F.t(k, v); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index e5f005d..5b5369c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -195,21 +195,21 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public <T> boolean writeObjectArray(String name, T[] arr, Class<T> itemCls) { - stream.writeObjectArray(arr, itemCls); + stream.writeObjectArray(arr, itemCls, this); return stream.lastFinished(); } /** {@inheritDoc} */ @Override public <T> boolean writeCollection(String name, Collection<T> col, Class<T> itemCls) { - stream.writeCollection(col, itemCls); + stream.writeCollection(col, itemCls, this); return stream.lastFinished(); } /** {@inheritDoc} */ @Override public <K, V> boolean writeMap(String name, Map<K, V> map, Class<K> keyCls, Class<V> valCls) { - stream.writeMap(map, keyCls, valCls); + stream.writeMap(map, keyCls, valCls, this); return stream.lastFinished(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 585bf6e..b0c34ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; import org.apache.ignite.thread.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -138,6 +139,9 @@ public class GridNioServer<T> { private GridNioSslFilter sslFilter; /** */ + private IgniteSpiAdapter spi; + + /** */ private MessageWriterFactory messageWriterFactory; /** Static initializer ensures single-threaded execution of workaround. */ @@ -186,7 +190,7 @@ public class GridNioServer<T> { boolean directMode, boolean daemon, GridNioMetricsListener metricsLsnr, - MessageWriterFactory messageWriterFactory, + IgniteSpiAdapter spi, GridNioFilter... filters ) throws IgniteCheckedException { A.notNull(addr, "addr"); @@ -251,7 +255,7 @@ public class GridNioServer<T> { this.directMode = directMode; this.metricsLsnr = metricsLsnr; - this.messageWriterFactory = messageWriterFactory; + this.spi = spi; } /** @@ -1018,6 +1022,9 @@ public class GridNioServer<T> { assert msg != null; + if (messageWriterFactory == null) + messageWriterFactory = spi.getSpiContext().messageWriterFactory(); + msg.setWriter(messageWriterFactory.writer()); finished = msg.writeTo(buf); @@ -1039,6 +1046,9 @@ public class GridNioServer<T> { assert msg != null; + if (messageWriterFactory == null) + messageWriterFactory = spi.getSpiContext().messageWriterFactory(); + msg.setWriter(messageWriterFactory.writer()); finished = msg.writeTo(buf); @@ -2067,8 +2077,8 @@ public class GridNioServer<T> { /** Daemon flag. */ private boolean daemon; - /** Message writer factory. */ - private MessageWriterFactory messageWriterFactory; + /** SPI. */ + private IgniteSpiAdapter spi; /** * Finishes building the instance. @@ -2093,7 +2103,7 @@ public class GridNioServer<T> { directMode, daemon, metricsLsnr, - messageWriterFactory, + spi, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS ); @@ -2288,11 +2298,11 @@ public class GridNioServer<T> { } /** - * @param messageWriterFactory Message writer factory.. + * @param spi SPI. * @return This for chaining. */ - public Builder<T> messageWriterFactory(MessageWriterFactory messageWriterFactory) { - this.messageWriterFactory = messageWriterFactory; + public Builder<T> spi(IgniteSpiAdapter spi) { + this.spi = spi; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index dacd0d1..2f7581a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1489,7 +1489,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .writeTimeout(sockWriteTimeout) .filters(new GridNioCodecFilter(new GridDirectParser(this), log, true), new GridConnectionBytesVerifyFilter(log)) - .messageWriterFactory(getSpiContext().messageWriterFactory()) + .spi(this) .build(); boundTcpPort = port;