# IGNITE-61 - Direct marshalling

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c82d0a29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c82d0a29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c82d0a29

Branch: refs/heads/ignite-82
Commit: c82d0a2998c77df387c55b99d5e92263eed3fe3c
Parents: 81ffe1c
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sun Feb 8 19:02:07 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sun Feb 8 19:02:07 2015 -0800

----------------------------------------------------------------------
 .../org/apache/ignite/internal/util/IgniteUtils.java    |  3 +--
 .../ignite/internal/util/ipc/IpcToNioAdapter.java       |  9 ++++++++-
 .../internal/util/nio/GridShmemCommunicationClient.java | 12 ++++++++++--
 .../internal/util/nio/GridTcpCommunicationClient.java   | 10 +++++++++-
 .../spi/communication/tcp/TcpCommunicationSpi.java      |  4 +++-
 5 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 33a6e99..9d36ad3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9246,8 +9246,7 @@ public abstract class IgniteUtils {
      * @return Number of written bytes.
      * @throws IOException In case of error.
      */
-    public static int writeMessageFully(MessageAdapter msg, OutputStream out, 
ByteBuffer buf)
-        throws IOException {
+    public static int writeMessageFully(MessageAdapter msg, OutputStream out, 
ByteBuffer buf) throws IOException {
         assert msg != null;
         assert out != null;
         assert buf != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 1e9d98d..67c2eae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -53,19 +53,24 @@ public class IpcToNioAdapter<T> {
     /** */
     private final GridNioMetricsListener metricsLsnr;
 
+    /** */
+    private final MessageWriterFactory writerFactory;
+
     /**
      * @param metricsLsnr Metrics listener.
      * @param log Log.
      * @param endp Endpoint.
      * @param lsnr Listener.
+     * @param writerFactory Message writer factory.
      * @param filters Filters.
      */
     public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger 
log, IpcEndpoint endp,
-        GridNioServerListener<T> lsnr, GridNioFilter... filters) {
+        GridNioServerListener<T> lsnr, MessageWriterFactory writerFactory, 
GridNioFilter... filters) {
         assert metricsLsnr != null;
 
         this.metricsLsnr = metricsLsnr;
         this.endp = endp;
+        this.writerFactory = writerFactory;
 
         chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
         ses = new GridNioSessionImpl(chain, null, null, true);
@@ -147,6 +152,8 @@ public class IpcToNioAdapter<T> {
         assert writeBuf.hasArray();
 
         try {
+            msg.setWriter(writerFactory.writer());
+
             int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf);
 
             metricsLsnr.onBytesSent(cnt);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 5413f4a..c3c9a92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -38,15 +38,19 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
     /** */
     private final ByteBuffer writeBuf;
 
+    /** */
+    private final MessageWriterFactory writerFactory;
+
     /**
      * @param metricsLsnr Metrics listener.
      * @param port Shared memory IPC server port.
      * @param connTimeout Connection timeout.
      * @param log Logger.
+     * @param writerFactory Message writer factory.
      * @throws IgniteCheckedException If failed.
      */
     public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, 
int port, long connTimeout,
-        IgniteLogger log) throws IgniteCheckedException {
+        IgniteLogger log, MessageWriterFactory writerFactory) throws 
IgniteCheckedException {
         super(metricsLsnr);
 
         assert metricsLsnr != null;
@@ -58,10 +62,12 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
         writeBuf = ByteBuffer.allocate(8 << 10);
 
         writeBuf.order(ByteOrder.nativeOrder());
+
+        this.writerFactory = writerFactory;
     }
 
     /** {@inheritDoc} */
-    @Override public  synchronized void 
doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
+    @Override public synchronized void 
doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
         throws IgniteCheckedException {
         handshakeC.applyx(shmem.inputStream(), shmem.outputStream());
     }
@@ -110,6 +116,8 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
         assert writeBuf.hasArray();
 
         try {
+            msg.setWriter(writerFactory.writer());
+
             int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf);
 
             metricsLsnr.onBytesSent(cnt);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
index 6b36f88..561547d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
@@ -48,6 +48,9 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
     /** */
     private final ByteBuffer writeBuf;
 
+    /** */
+    private final MessageWriterFactory writerFactory;
+
     /**
      * @param metricsLsnr Metrics listener.
      * @param addr Address.
@@ -59,6 +62,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
      * @param bufSize Buffer size (or {@code 0} to disable buffer).
      * @param minBufferedMsgCnt Minimum buffered message count.
      * @param bufSizeRatio Communication buffer size ratio.
+     * @param writerFactory Message writer factory.
      * @throws IgniteCheckedException If failed.
      */
     public GridTcpCommunicationClient(
@@ -71,7 +75,8 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
         int sockSndBuf,
         int bufSize,
         int minBufferedMsgCnt,
-        double bufSizeRatio
+        double bufSizeRatio,
+        MessageWriterFactory writerFactory
     ) throws IgniteCheckedException {
         super(metricsLsnr);
 
@@ -88,6 +93,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
 
         this.minBufferedMsgCnt = minBufferedMsgCnt;
         this.bufSizeRatio = bufSizeRatio;
+        this.writerFactory = writerFactory;
 
         writeBuf = ByteBuffer.allocate(8 << 10);
 
@@ -191,6 +197,8 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
         assert writeBuf.hasArray();
 
         try {
+            msg.setWriter(writerFactory.writer());
+
             int cnt = U.writeMessageFully(msg, out, writeBuf);
 
             metricsLsnr.onBytesSent(cnt);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2f7581a..e3db36b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1851,7 +1851,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client;
 
             try {
-                client = new GridShmemCommunicationClient(metricsLsnr, port, 
connTimeout, log);
+                client = new GridShmemCommunicationClient(metricsLsnr, port, 
connTimeout, log,
+                    getSpiContext().messageWriterFactory());
             }
             catch (IgniteCheckedException e) {
                 // Reconnect for the second time, if connection is not 
established.
@@ -2420,6 +2421,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     log,
                     endpoint,
                     srvLsnr,
+                    getSpiContext().messageWriterFactory(),
                     new GridNioCodecFilter(new 
GridDirectParser(TcpCommunicationSpi.this), log, true),
                     new GridConnectionBytesVerifyFilter(log)
                 );

Reply via email to