ignite-nio - Removing message clone

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a3fe9339
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a3fe9339
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a3fe9339

Branch: refs/heads/sprint-1
Commit: a3fe93393a62a35cc72ed839e663b5dbb8676c65
Parents: e74ff00
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Sat Feb 14 17:55:08 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Sat Feb 14 17:55:08 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 46 ++++++++------------
 .../internal/direct/DirectMessageReader.java    |  8 ++--
 .../managers/communication/GridIoManager.java   |  2 +-
 .../communication/GridIoMessageFactory.java     | 11 +----
 .../internal/util/nio/GridDirectParser.java     | 20 ++++++++-
 .../ignite/internal/util/nio/GridNioServer.java |  6 +++
 .../communication/tcp/TcpCommunicationSpi.java  |  9 +++-
 .../testframework/GridSpiTestContext.java       |  2 +-
 8 files changed, 56 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index e642790..1f2579f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -479,16 +479,6 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value.
-     */
-    public void writeByteArray(byte[] val, int off, int len) {
-        if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len, true);
-        else
-            writeInt(-1);
-    }
-
-    /**
      * @param val Value
      */
     public void writeShortArray(short[] val) {
@@ -920,14 +910,6 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param len Length.
-     * @return Value.
-     */
-    public byte[] readByteArray(int len) {
-        return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF, len);
-    }
-
-    /**
      /**
       * @return Value.
       */
@@ -1028,7 +1010,7 @@ public class DirectByteBufferStream {
      * @return Message.
      */
     @SuppressWarnings("unchecked")
-    public <T extends MessageAdapter> T readMessage() {
+    public <T extends MessageAdapter> T readMessage(MessageReader reader) {
         if (!msgTypeDone) {
             if (!buf.hasRemaining()) {
                 lastFinished = false;
@@ -1040,6 +1022,9 @@ public class DirectByteBufferStream {
 
             msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
 
+            if (msg != null)
+                msg.setReader(reader);
+
             msgTypeDone = true;
         }
 
@@ -1059,10 +1044,11 @@ public class DirectByteBufferStream {
 
     /**
      * @param itemCls Component type.
+     * @param reader Reader.
      * @return Array.
      */
     @SuppressWarnings("unchecked")
-    public <T> T[] readObjectArray(Class<?> itemCls) {
+    public <T> T[] readObjectArray(Class<?> itemCls, MessageReader reader) {
         if (readSize == -1) {
             int size = readInt();
 
@@ -1079,7 +1065,7 @@ public class DirectByteBufferStream {
             Type itemType = type(itemCls);
 
             for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType);
+                Object item = read(itemType, reader);
 
                 if (!lastFinished)
                     return null;
@@ -1103,10 +1089,11 @@ public class DirectByteBufferStream {
 
     /**
      * @param itemCls Item type.
+     * @param reader Reader.
      * @return Collection.
      */
     @SuppressWarnings("unchecked")
-    public <C extends Collection<T>, T> C readCollection(Class<T> itemCls) {
+    public <C extends Collection<T>, T> C readCollection(Class<T> itemCls, 
MessageReader reader) {
         if (readSize == -1) {
             int size = readInt();
 
@@ -1123,7 +1110,7 @@ public class DirectByteBufferStream {
             Type itemType = type(itemCls);
 
             for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType);
+                Object item = read(itemType, reader);
 
                 if (!lastFinished)
                     return null;
@@ -1148,11 +1135,13 @@ public class DirectByteBufferStream {
     /**
      * @param keyCls Key type.
      * @param valCls Value type.
+     * @param reader Reader.
      * @param linked Whether linked map should be created.
      * @return Map.
      */
     @SuppressWarnings("unchecked")
-    public <M extends Map<K, V>, K, V> M readMap(Class<K> keyCls, Class<V> 
valCls, boolean linked) {
+    public <M extends Map<K, V>, K, V> M readMap(Class<K> keyCls, Class<V> 
valCls, MessageReader reader,
+        boolean linked) {
         if (readSize == -1) {
             int size = readInt();
 
@@ -1171,7 +1160,7 @@ public class DirectByteBufferStream {
 
             for (int i = readItems; i < readSize; i++) {
                 if (!keyDone) {
-                    Object key = read(keyType);
+                    Object key = read(keyType, reader);
 
                     if (!lastFinished)
                         return null;
@@ -1180,7 +1169,7 @@ public class DirectByteBufferStream {
                     keyDone = true;
                 }
 
-                Object val = read(valType);
+                Object val = read(valType, reader);
 
                 if (!lastFinished)
                     return null;
@@ -1476,9 +1465,10 @@ public class DirectByteBufferStream {
 
     /**
      * @param type Type.
+     * @param reader Reader.
      * @return Value.
      */
-    private Object read(Type type) {
+    private Object read(Type type, MessageReader reader) {
         switch (type) {
             case BYTE:
                 return readByte();
@@ -1541,7 +1531,7 @@ public class DirectByteBufferStream {
                 return readIgniteUuid();
 
             case MSG:
-                return readMessage();
+                return readMessage(reader);
 
             default:
                 throw new IllegalArgumentException("Unknown type: " + type);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 48c79ce..9a3fb3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -237,7 +237,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public <T extends MessageAdapter> T readMessage(String 
name) {
-        T msg = stream.readMessage();
+        T msg = stream.readMessage(this);
 
         lastRead = stream.lastFinished();
 
@@ -246,7 +246,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <T> T[] readObjectArray(String name, Class<T> itemCls) {
-        T[] msg = stream.readObjectArray(itemCls);
+        T[] msg = stream.readObjectArray(itemCls, this);
 
         lastRead = stream.lastFinished();
 
@@ -255,7 +255,7 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <C extends Collection<T>, T> C readCollection(String 
name, Class<T> itemCls) {
-        C col = stream.readCollection(itemCls);
+        C col = stream.readCollection(itemCls, this);
 
         lastRead = stream.lastFinished();
 
@@ -265,7 +265,7 @@ public class DirectMessageReader implements MessageReader {
     /** {@inheritDoc} */
     @Override public <M extends Map<K, V>, K, V> M readMap(String name, 
Class<K> keyCls, Class<V> valCls,
         boolean linked) {
-        M map = stream.readMap(keyCls, valCls, linked);
+        M map = stream.readMap(keyCls, valCls, this, linked);
 
         lastRead = stream.lastFinished();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index e201293..7f2fdb8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -229,7 +229,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             };
         }
 
-        msgFactory = new GridIoMessageFactory(formatter, 
ctx.plugins().extensions(MessageFactory.class));
+        msgFactory = new 
GridIoMessageFactory(ctx.plugins().extensions(MessageFactory.class));
 
         if (log.isDebugEnabled())
             log.debug(startInfo());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 56abef5..4f3182f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -52,20 +52,13 @@ public class GridIoMessageFactory implements MessageFactory 
{
     /** Custom messages registry. Used for test purposes. */
     private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = 
new ConcurrentHashMap8<>();
 
-    /** Message reader factory. */
-    private final MessageFormatter formatter;
-
     /** Extensions. */
     private final MessageFactory[] ext;
 
     /**
-     * @param formatter Message formatter.
      * @param ext Extensions.
      */
-    public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] 
ext) {
-        assert formatter != null;
-
-        this.formatter = formatter;
+    public GridIoMessageFactory(MessageFactory[] ext) {
         this.ext = ext;
     }
 
@@ -524,8 +517,6 @@ public class GridIoMessageFactory implements MessageFactory 
{
         if (msg == null)
             throw new IgniteException("Invalid message type: " + type);
 
-        msg.setReader(formatter.reader());
-
         return msg;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 3b00bd9..349045c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -31,25 +31,41 @@ public class GridDirectParser implements GridNioParser {
     /** Message metadata key. */
     private static final int MSG_META_KEY = 
GridNioSessionMetaKey.nextUniqueKey();
 
+    /** Message reader key. */
+    private static final int MSG_READER_KEY = 
GridNioSessionMetaKey.nextUniqueKey();
+
     /** */
     private final MessageFactory msgFactory;
 
+    /** */
+    private final MessageFormatter formatter;
+
     /**
      * @param msgFactory Message factory.
+     * @param formatter Formatter.
      */
-    public GridDirectParser(MessageFactory msgFactory) {
+    public GridDirectParser(MessageFactory msgFactory, MessageFormatter 
formatter) {
         assert msgFactory != null;
 
         this.msgFactory = msgFactory;
+        this.formatter = formatter;
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer 
buf) throws IOException, IgniteCheckedException {
         MessageAdapter msg = ses.removeMeta(MSG_META_KEY);
 
-        if (msg == null && buf.hasRemaining())
+        if (msg == null && buf.hasRemaining()) {
+            MessageReader reader = ses.meta(MSG_READER_KEY);
+
+            if (reader == null)
+                ses.addMeta(MSG_READER_KEY, reader = formatter.reader());
+
             msg = msgFactory.create(buf.get());
 
+            msg.setReader(reader);
+        }
+
         boolean finished = false;
 
         if (buf.hasRemaining())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 61bb550..68515c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -889,6 +889,9 @@ public class GridNioServer<T> {
                         assert msg != null;
 
                         finished = msg.writeTo(buf, state);
+
+                        if (finished)
+                            state.reset();
                     }
 
                     // Fill up as many messages as possible to write buffer.
@@ -908,6 +911,9 @@ public class GridNioServer<T> {
                         assert msg != null;
 
                         finished = msg.writeTo(buf, state);
+
+                        if (finished)
+                            state.reset();
                     }
 
                     buf.flip();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 193e4d7..eb118b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1504,6 +1504,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
+                GridDirectParser parser = new GridDirectParser(messageFactory, 
messageFormatter);
+
                 GridNioServer<MessageAdapter> srvr =
                     GridNioServer.<MessageAdapter>builder()
                         .address(locHost)
@@ -1521,7 +1523,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
-                        .filters(new GridNioCodecFilter(new 
GridDirectParser(messageFactory), log, true),
+                        .filters(new GridNioCodecFilter(parser, log, true),
                             new GridConnectionBytesVerifyFilter(log))
                         .messageFormatter(messageFormatter)
                         .build();
@@ -2452,13 +2454,16 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             try {
+                GridDirectParser parser = new 
GridDirectParser(getSpiContext().messageFactory(),
+                    getSpiContext().messageFormatter());
+
                 IpcToNioAdapter<MessageAdapter> adapter = new 
IpcToNioAdapter<>(
                     metricsLsnr,
                     log,
                     endpoint,
                     srvLsnr,
                     getSpiContext().messageFormatter(),
-                    new GridNioCodecFilter(new 
GridDirectParser(getSpiContext().messageFactory()), log, true),
+                    new GridNioCodecFilter(parser, log, true),
                     new GridConnectionBytesVerifyFilter(log)
                 );
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 80b9b96..d8321b3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -531,7 +531,7 @@ public class GridSpiTestContext implements IgniteSpiContext 
{
     /** {@inheritDoc} */
     @Override public MessageFactory messageFactory() {
         if (factory == null)
-            factory = new GridIoMessageFactory(messageFormatter(), null);
+            factory = new GridIoMessageFactory(null);
 
         return factory;
     }

Reply via email to