IGNITE-61 - Direct marshalling

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/81ffe1c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/81ffe1c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/81ffe1c9

Branch: refs/heads/ignite-82
Commit: 81ffe1c9b701a828961a666553dc747a2f0cc344
Parents: 8c4d06f
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sun Feb 8 18:26:25 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sun Feb 8 18:26:25 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 26 ++++++++++++++------
 .../internal/direct/DirectMessageWriter.java    |  6 ++---
 .../ignite/internal/util/nio/GridNioServer.java | 26 ++++++++++++++------
 .../communication/tcp/TcpCommunicationSpi.java  |  2 +-
 4 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index 1b8b5ef..7f802a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -555,7 +555,7 @@ public class DirectByteBufferStream {
             writeByte(Byte.MIN_VALUE);
     }
 
-    public <T> void writeObjectArray(T[] arr, Class<T> itemCls) {
+    public <T> void writeObjectArray(T[] arr, Class<T> itemCls, MessageWriter 
writer) {
         if (arr != null) {
             if (it == null) {
                 writeInt(arr.length);
@@ -572,8 +572,11 @@ public class DirectByteBufferStream {
                 if (cur == NULL) {
                     cur = it.next();
 
-                    if (cur != null && itemType == Type.MSG)
+                    if (cur != null && itemType == Type.MSG) {
                         cur = ((MessageAdapter)cur).clone();
+
+                        ((MessageAdapter)cur).setWriter(writer);
+                    }
                 }
 
                 write(itemType, cur);
@@ -590,7 +593,7 @@ public class DirectByteBufferStream {
             writeInt(-1);
     }
 
-    public <T> void writeCollection(Collection<T> col, Class<T> itemCls) {
+    public <T> void writeCollection(Collection<T> col, Class<T> itemCls, 
MessageWriter writer) {
         if (col != null) {
             if (it == null) {
                 writeInt(col.size());
@@ -607,8 +610,11 @@ public class DirectByteBufferStream {
                 if (cur == NULL) {
                     cur = it.next();
 
-                    if (cur != null && itemType == Type.MSG)
+                    if (cur != null && itemType == Type.MSG) {
                         cur = ((MessageAdapter)cur).clone();
+
+                        ((MessageAdapter)cur).setWriter(writer);
+                    }
                 }
 
                 write(itemType, cur);
@@ -626,7 +632,7 @@ public class DirectByteBufferStream {
     }
 
     @SuppressWarnings("unchecked")
-    public <K, V> void writeMap(Map<K, V> map, Class<K> keyCls, Class<V> 
valCls) {
+    public <K, V> void writeMap(Map<K, V> map, Class<K> keyCls, Class<V> 
valCls, MessageWriter writer) {
         if (map != null) {
             if (it == null) {
                 writeInt(map.size());
@@ -652,12 +658,18 @@ public class DirectByteBufferStream {
                         K k = e.getKey();
                         V v = e.getValue();
 
-                        if (k != null && keyType == Type.MSG)
+                        if (k != null && keyType == Type.MSG) {
                             k = (K)((MessageAdapter)k).clone();
 
-                        if (v != null && valType == Type.MSG)
+                            ((MessageAdapter)k).setWriter(writer);
+                        }
+
+                        if (v != null && valType == Type.MSG) {
                             v = (V)((MessageAdapter)v).clone();
 
+                            ((MessageAdapter)v).setWriter(writer);
+                        }
+
                         cur = e = F.t(k, v);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index e5f005d..5b5369c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -195,21 +195,21 @@ public class DirectMessageWriter implements MessageWriter 
{
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeObjectArray(String name, T[] arr, 
Class<T> itemCls) {
-        stream.writeObjectArray(arr, itemCls);
+        stream.writeObjectArray(arr, itemCls, this);
 
         return stream.lastFinished();
     }
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeCollection(String name, Collection<T> 
col, Class<T> itemCls) {
-        stream.writeCollection(col, itemCls);
+        stream.writeCollection(col, itemCls, this);
 
         return stream.lastFinished();
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> boolean writeMap(String name, Map<K, V> map, 
Class<K> keyCls, Class<V> valCls) {
-        stream.writeMap(map, keyCls, valCls);
+        stream.writeMap(map, keyCls, valCls, this);
 
         return stream.lastFinished();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 585bf6e..b0c34ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.spi.*;
 import org.apache.ignite.thread.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
@@ -138,6 +139,9 @@ public class GridNioServer<T> {
     private GridNioSslFilter sslFilter;
 
     /** */
+    private IgniteSpiAdapter spi;
+
+    /** */
     private MessageWriterFactory messageWriterFactory;
 
     /** Static initializer ensures single-threaded execution of workaround. */
@@ -186,7 +190,7 @@ public class GridNioServer<T> {
         boolean directMode,
         boolean daemon,
         GridNioMetricsListener metricsLsnr,
-        MessageWriterFactory messageWriterFactory,
+        IgniteSpiAdapter spi,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         A.notNull(addr, "addr");
@@ -251,7 +255,7 @@ public class GridNioServer<T> {
 
         this.directMode = directMode;
         this.metricsLsnr = metricsLsnr;
-        this.messageWriterFactory = messageWriterFactory;
+        this.spi = spi;
     }
 
     /**
@@ -1018,6 +1022,9 @@ public class GridNioServer<T> {
 
                     assert msg != null;
 
+                    if (messageWriterFactory == null)
+                        messageWriterFactory = 
spi.getSpiContext().messageWriterFactory();
+
                     msg.setWriter(messageWriterFactory.writer());
 
                     finished = msg.writeTo(buf);
@@ -1039,6 +1046,9 @@ public class GridNioServer<T> {
 
                     assert msg != null;
 
+                    if (messageWriterFactory == null)
+                        messageWriterFactory = 
spi.getSpiContext().messageWriterFactory();
+
                     msg.setWriter(messageWriterFactory.writer());
 
                     finished = msg.writeTo(buf);
@@ -2067,8 +2077,8 @@ public class GridNioServer<T> {
         /** Daemon flag. */
         private boolean daemon;
 
-        /** Message writer factory. */
-        private MessageWriterFactory messageWriterFactory;
+        /** SPI. */
+        private IgniteSpiAdapter spi;
 
         /**
          * Finishes building the instance.
@@ -2093,7 +2103,7 @@ public class GridNioServer<T> {
                 directMode,
                 daemon,
                 metricsLsnr,
-                messageWriterFactory,
+                spi,
                 filters != null ? Arrays.copyOf(filters, filters.length) : 
EMPTY_FILTERS
             );
 
@@ -2288,11 +2298,11 @@ public class GridNioServer<T> {
         }
 
         /**
-         * @param messageWriterFactory Message writer factory..
+         * @param spi SPI.
          * @return This for chaining.
          */
-        public Builder<T> messageWriterFactory(MessageWriterFactory 
messageWriterFactory) {
-            this.messageWriterFactory = messageWriterFactory;
+        public Builder<T> spi(IgniteSpiAdapter spi) {
+            this.spi = spi;
 
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/81ffe1c9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index dacd0d1..2f7581a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1489,7 +1489,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .writeTimeout(sockWriteTimeout)
                         .filters(new GridNioCodecFilter(new 
GridDirectParser(this), log, true),
                             new GridConnectionBytesVerifyFilter(log))
-                        
.messageWriterFactory(getSpiContext().messageWriterFactory())
+                        .spi(this)
                         .build();
 
                 boundTcpPort = port;

Reply via email to