IGNITE-61 - Direct marshalling

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f62ee2d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f62ee2d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f62ee2d3

Branch: refs/heads/ignite-82
Commit: f62ee2d36b23667b9f8cae289e991ce190b9d0c7
Parents: 3b721e3
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sun Feb 8 16:32:20 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sun Feb 8 16:32:20 2015 -0800

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |  4 ++++
 .../managers/communication/GridIoManager.java   | 14 ++++++++++--
 .../ignite/internal/util/nio/GridNioServer.java | 23 ++++++++++++++++++++
 .../communication/MessageAdapter.java           |  8 +++----
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  5 +++++
 .../org/apache/ignite/spi/IgniteSpiContext.java |  8 +++++++
 .../communication/tcp/TcpCommunicationSpi.java  |  1 +
 .../testframework/GridSpiTestContext.java       |  8 +++++++
 8 files changed, 64 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 58b84b2..3e85199 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -547,6 +547,10 @@ public abstract class GridManagerAdapter<T extends 
IgniteSpi> implements GridMan
                         }
                     }
 
+                    @Override public MessageWriterFactory 
messageWriterFactory() {
+                        return ctx.io().messageWriterFactory();
+                    }
+
                     @Override public GridTcpMessageFactory messageFactory() {
                         return ctx.io().messageFactory();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index abb83bf..2d7b67f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -222,6 +222,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         };
     }
 
+    /**
+     * @return Message factory.
+     */
     public GridTcpMessageFactory messageFactory() {
         assert msgFactory != null;
 
@@ -229,6 +232,15 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @return Message writer factory.
+     */
+    public MessageWriterFactory messageWriterFactory() {
+        assert writerFactory != null;
+
+        return writerFactory;
+    }
+
+    /**
      * Resets metrics for this manager.
      */
     public void resetMetrics() {
@@ -978,8 +990,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         else {
             GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, 
msg.clone(), ordered, timeout, skipOnTimeout);
 
-            ioMsg.setWriter(writerFactory.writer());
-
             if (topicOrd < 0)
                 ioMsg.topicBytes(marsh.marshal(topic));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9ec25f6..42a540b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -137,6 +137,9 @@ public class GridNioServer<T> {
     /** */
     private GridNioSslFilter sslFilter;
 
+    /** */
+    private MessageWriterFactory messageWriterFactory;
+
     /** Static initializer ensures single-threaded execution of workaround. */
     static {
         // This is a workaround for JDK bug (NPE in Selector.open()).
@@ -183,6 +186,7 @@ public class GridNioServer<T> {
         boolean directMode,
         boolean daemon,
         GridNioMetricsListener metricsLsnr,
+        MessageWriterFactory messageWriterFactory,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         A.notNull(addr, "addr");
@@ -247,6 +251,7 @@ public class GridNioServer<T> {
 
         this.directMode = directMode;
         this.metricsLsnr = metricsLsnr;
+        this.messageWriterFactory = messageWriterFactory;
     }
 
     /**
@@ -1013,6 +1018,8 @@ public class GridNioServer<T> {
 
                     assert msg != null;
 
+                    msg.setWriter(messageWriterFactory.writer());
+
                     finished = msg.writeTo(buf);
                 }
 
@@ -1032,6 +1039,8 @@ public class GridNioServer<T> {
 
                     assert msg != null;
 
+                    msg.setWriter(messageWriterFactory.writer());
+
                     finished = msg.writeTo(buf);
                 }
 
@@ -2058,6 +2067,9 @@ public class GridNioServer<T> {
         /** Daemon flag. */
         private boolean daemon;
 
+        /** Message writer factory. */
+        private MessageWriterFactory messageWriterFactory;
+
         /**
          * Finishes building the instance.
          *
@@ -2081,6 +2093,7 @@ public class GridNioServer<T> {
                 directMode,
                 daemon,
                 metricsLsnr,
+                messageWriterFactory,
                 filters != null ? Arrays.copyOf(filters, filters.length) : 
EMPTY_FILTERS
             );
 
@@ -2273,5 +2286,15 @@ public class GridNioServer<T> {
 
             return this;
         }
+
+        /**
+         * @param messageWriterFactory Message writer factory.
+         * @return This for chaining.
+         */
+        public Builder<T> messageWriterFactory(MessageWriterFactory 
messageWriterFactory) {
+            this.messageWriterFactory = messageWriterFactory;
+
+            return this;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
index c2b4d96..d393e16 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.plugin.extensions.communication;
 
-import org.apache.ignite.internal.direct.*;
-
 import java.io.*;
 import java.nio.*;
 
@@ -27,7 +25,7 @@ import java.nio.*;
  */
 public abstract class MessageAdapter implements Serializable, Cloneable {
     /** Writer. */
-    protected final MessageWriter writer = new DirectMessageWriter();
+    protected MessageWriter writer;
 
     /** Reader. */
     protected MessageReader reader;
@@ -42,8 +40,8 @@ public abstract class MessageAdapter implements Serializable, 
Cloneable {
      * @param writer Writer.
      */
     public final void setWriter(MessageWriter writer) {
-//        if (this.writer == null)
-//            this.writer = writer;
+        if (this.writer == null)
+            this.writer = writer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 21f75a8..c3b4700 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.direct.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.authentication.*;
 import org.apache.ignite.internal.managers.communication.*;
@@ -724,6 +725,10 @@ public abstract class IgniteSpiAdapter implements 
IgniteSpi, IgniteSpiManagement
             return null;
         }
 
+        @Override public MessageWriterFactory messageWriterFactory() {
+            return null;
+        }
+
         /** {@inheritDoc} */
         @Override public GridTcpMessageFactory messageFactory() {
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 06004c5..1f68226 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.direct.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.spi.swapspace.*;
 import org.jetbrains.annotations.*;
@@ -398,6 +399,13 @@ public interface IgniteSpiContext {
         @Nullable ClassLoader ldr) throws IgniteException;
 
     /**
+     * Gets message writer factory.
+     *
+     * @return Message writer factory.
+     */
+    public MessageWriterFactory messageWriterFactory();
+
+    /**
      * @return Message factory.
      */
     public GridTcpMessageFactory messageFactory();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4db4ef8..dacd0d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1489,6 +1489,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .writeTimeout(sockWriteTimeout)
                         .filters(new GridNioCodecFilter(new 
GridDirectParser(this), log, true),
                             new GridConnectionBytesVerifyFilter(log))
+                        
.messageWriterFactory(getSpiContext().messageWriterFactory())
                         .build();
 
                 boundTcpPort = port;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f62ee2d3/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 48d9e2d..76fd302 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -516,6 +516,14 @@ public class GridSpiTestContext implements 
IgniteSpiContext {
         return null;
     }
 
+    @Override public MessageWriterFactory messageWriterFactory() {
+        return new MessageWriterFactory() {
+            @Override public MessageWriter writer() {
+                return new DirectMessageWriter();
+            }
+        };
+    }
+
     /** {@inheritDoc} */
     @Override public GridTcpMessageFactory messageFactory() {
         return new GridTcpMessageFactory() {

Reply via email to