# IGNITE-61 - Direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c82d0a29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c82d0a29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c82d0a29 Branch: refs/heads/ignite-82 Commit: c82d0a2998c77df387c55b99d5e92263eed3fe3c Parents: 81ffe1c Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 8 19:02:07 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 8 19:02:07 2015 -0800 ---------------------------------------------------------------------- .../org/apache/ignite/internal/util/IgniteUtils.java | 3 +-- .../ignite/internal/util/ipc/IpcToNioAdapter.java | 9 ++++++++- .../internal/util/nio/GridShmemCommunicationClient.java | 12 ++++++++++-- .../internal/util/nio/GridTcpCommunicationClient.java | 10 +++++++++- .../spi/communication/tcp/TcpCommunicationSpi.java | 4 +++- 5 files changed, 31 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 33a6e99..9d36ad3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9246,8 +9246,7 @@ public abstract class IgniteUtils { * @return Number of written bytes. * @throws IOException In case of error. */ - public static int writeMessageFully(MessageAdapter msg, OutputStream out, ByteBuffer buf) - throws IOException { + public static int writeMessageFully(MessageAdapter msg, OutputStream out, ByteBuffer buf) throws IOException { assert msg != null; assert out != null; assert buf != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index 1e9d98d..67c2eae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -53,19 +53,24 @@ public class IpcToNioAdapter<T> { /** */ private final GridNioMetricsListener metricsLsnr; + /** */ + private final MessageWriterFactory writerFactory; + /** * @param metricsLsnr Metrics listener. * @param log Log. * @param endp Endpoint. * @param lsnr Listener. + * @param writerFactory Message writer factory. * @param filters Filters. */ public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp, - GridNioServerListener<T> lsnr, GridNioFilter... filters) { + GridNioServerListener<T> lsnr, MessageWriterFactory writerFactory, GridNioFilter... filters) { assert metricsLsnr != null; this.metricsLsnr = metricsLsnr; this.endp = endp; + this.writerFactory = writerFactory; chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); ses = new GridNioSessionImpl(chain, null, null, true); @@ -147,6 +152,8 @@ public class IpcToNioAdapter<T> { assert writeBuf.hasArray(); try { + msg.setWriter(writerFactory.writer()); + int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf); metricsLsnr.onBytesSent(cnt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 5413f4a..c3c9a92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -38,15 +38,19 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien /** */ private final ByteBuffer writeBuf; + /** */ + private final MessageWriterFactory writerFactory; + /** * @param metricsLsnr Metrics listener. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. * @param log Logger. + * @param writerFactory Message writer factory. * @throws IgniteCheckedException If failed. */ public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, int port, long connTimeout, - IgniteLogger log) throws IgniteCheckedException { + IgniteLogger log, MessageWriterFactory writerFactory) throws IgniteCheckedException { super(metricsLsnr); assert metricsLsnr != null; @@ -58,10 +62,12 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien writeBuf = ByteBuffer.allocate(8 << 10); writeBuf.order(ByteOrder.nativeOrder()); + + this.writerFactory = writerFactory; } /** {@inheritDoc} */ - @Override public synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) + @Override public synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException { handshakeC.applyx(shmem.inputStream(), shmem.outputStream()); } @@ -110,6 +116,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien assert writeBuf.hasArray(); try { + msg.setWriter(writerFactory.writer()); + int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf); metricsLsnr.onBytesSent(cnt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java index 6b36f88..561547d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java @@ -48,6 +48,9 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient /** */ private final ByteBuffer writeBuf; + /** */ + private final MessageWriterFactory writerFactory; + /** * @param metricsLsnr Metrics listener. * @param addr Address. @@ -59,6 +62,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient * @param bufSize Buffer size (or {@code 0} to disable buffer). * @param minBufferedMsgCnt Minimum buffered message count. * @param bufSizeRatio Communication buffer size ratio. + * @param writerFactory Message writer factory. * @throws IgniteCheckedException If failed. */ public GridTcpCommunicationClient( @@ -71,7 +75,8 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient int sockSndBuf, int bufSize, int minBufferedMsgCnt, - double bufSizeRatio + double bufSizeRatio, + MessageWriterFactory writerFactory ) throws IgniteCheckedException { super(metricsLsnr); @@ -88,6 +93,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient this.minBufferedMsgCnt = minBufferedMsgCnt; this.bufSizeRatio = bufSizeRatio; + this.writerFactory = writerFactory; writeBuf = ByteBuffer.allocate(8 << 10); @@ -191,6 +197,8 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient assert writeBuf.hasArray(); try { + msg.setWriter(writerFactory.writer()); + int cnt = U.writeMessageFully(msg, out, writeBuf); metricsLsnr.onBytesSent(cnt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d0a29/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2f7581a..e3db36b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1851,7 +1851,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client; try { - client = new GridShmemCommunicationClient(metricsLsnr, port, connTimeout, log); + client = new GridShmemCommunicationClient(metricsLsnr, port, connTimeout, log, + getSpiContext().messageWriterFactory()); } catch (IgniteCheckedException e) { // Reconnect for the second time, if connection is not established. @@ -2420,6 +2421,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log, endpoint, srvLsnr, + getSpiContext().messageWriterFactory(), new GridNioCodecFilter(new GridDirectParser(TcpCommunicationSpi.this), log, true), new GridConnectionBytesVerifyFilter(log) );