ignite-nio - Removing message clone

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/82bca8ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/82bca8ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/82bca8ea

Branch: refs/heads/ignite-nio
Commit: 82bca8ea42635a636876c508164eec517a952a33
Parents: 7e6b8df
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Fri Feb 13 18:12:54 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Fri Feb 13 18:12:54 2015 -0800

----------------------------------------------------------------------
 .../CommunicationMessageCodeGenerator.java      |  19 +--
 .../internal/direct/DirectByteBufferStream.java | 102 ++++++++-----
 .../internal/direct/DirectMessageWriter.java    |   4 +-
 .../managers/communication/GridIoManager.java   |   6 +-
 .../internal/util/ipc/IpcToNioAdapter.java      |   5 +-
 .../ignite/internal/util/nio/GridNioServer.java |  17 ++-
 .../util/nio/GridNioSessionMetaKey.java         |   4 +-
 .../util/nio/GridShmemCommunicationClient.java  |   5 +-
 .../util/nio/GridTcpCommunicationClient.java    |   2 +-
 .../communication/MessageAdapter.java           |   9 +-
 .../communication/MessageWriteState.java        | 143 +++++++++++++++++++
 .../extensions/communication/MessageWriter.java |   2 -
 12 files changed, 242 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
----------------------------------------------------------------------
diff --git 
a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
 
b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
index 54d4a30..13a50bf 100644
--- 
a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
+++ 
b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java
@@ -63,12 +63,6 @@ public class CommunicationMessageCodeGenerator {
     private static final String BUF_VAR = "buf";
 
     /** */
-    private static final String STATE_VAR = "state";
-
-    /** */
-    private static final String TYPE_WRITTEN_VAR = "typeWritten";
-
-    /** */
     private final Collection<String> write = new ArrayList<>();
 
     /** */
@@ -358,7 +352,8 @@ public class CommunicationMessageCodeGenerator {
         assert code != null;
 
         if (write) {
-            code.add(builder().a("MessageWriter writer = 
WRITER.get();").toString());
+            code.add(builder().a("MessageWriteState state = 
MessageWriteState.get();").toString());
+            code.add(builder().a("MessageWriter writer = 
state.writer();").toString());
             code.add(EMPTY);
         }
 
@@ -372,14 +367,14 @@ public class CommunicationMessageCodeGenerator {
         }
 
         if (write) {
-            code.add(builder().a("if (!").a(TYPE_WRITTEN_VAR).a(") 
{").toString());
+            code.add(builder().a("if (!state.isTypeWritten()) {").toString());
 
             indent++;
 
             returnFalseIfFailed(code, "writer.writeByte", "null", 
"directType()");
 
             code.add(EMPTY);
-            code.add(builder().a(TYPE_WRITTEN_VAR).a(" = true;").toString());
+            code.add(builder().a("state.setTypeWritten();").toString());
 
             indent--;
 
@@ -388,7 +383,7 @@ public class CommunicationMessageCodeGenerator {
         }
 
         if (!fields.isEmpty())
-            code.add(builder().a("switch (").a(STATE_VAR).a(") {").toString());
+            code.add(builder().a("switch (state.index()) {").toString());
     }
 
     /**
@@ -446,7 +441,7 @@ public class CommunicationMessageCodeGenerator {
             mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? 
mapAnn.valueType() : null, false);
 
         write.add(EMPTY);
-        write.add(builder().a(STATE_VAR).a("++;").toString());
+        write.add(builder().a("state.increment();").toString());
         write.add(EMPTY);
 
         indent--;
@@ -471,7 +466,7 @@ public class CommunicationMessageCodeGenerator {
             mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? 
mapAnn.valueType() : null);
 
         read.add(EMPTY);
-        read.add(builder().a(STATE_VAR).a("++;").toString());
+        read.add(builder().a("readState++;").toString());
         read.add(EMPTY);
 
         indent--;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index f74da65..6fb6667 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.direct;
 
 import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -599,8 +598,22 @@ public class DirectByteBufferStream {
      * @param msg Message.
      */
     public void writeMessage(MessageAdapter msg) {
-        if (msg != null)
-            lastFinished = buf.hasRemaining() && msg.writeTo(buf);
+        if (msg != null) {
+            if (buf.hasRemaining()) {
+                MessageWriteState state = MessageWriteState.get();
+
+                try {
+                    state.forward();
+
+                    lastFinished = msg.writeTo(buf);
+                }
+                finally {
+                    state.backward(lastFinished);
+                }
+            }
+            else
+                lastFinished = false;
+        }
         else
             writeByte(Byte.MIN_VALUE);
     }
@@ -621,20 +634,22 @@ public class DirectByteBufferStream {
                 it = arrayIterator(arr);
             }
 
+            MessageWriteState state = MessageWriteState.get();
+
             Type itemType = type(itemCls);
 
             while (it.hasNext() || cur != NULL) {
                 if (cur == NULL) {
                     cur = it.next();
 
-                    if (cur != null && itemType == Type.MSG) {
-                        cur = ((MessageAdapter)cur).clone();
-
-                        ((MessageAdapter)cur).setWriter(writer);
-                    }
+//                    if (cur != null && itemType == Type.MSG) {
+//                        cur = ((MessageAdapter)cur).clone();
+//
+//                        ((MessageAdapter)cur).setWriter(writer);
+//                    }
                 }
 
-                write(itemType, cur);
+                write(itemType, cur, state);
 
                 if (!lastFinished)
                     return;
@@ -664,20 +679,22 @@ public class DirectByteBufferStream {
                 it = col.iterator();
             }
 
+            MessageWriteState state = MessageWriteState.get();
+
             Type itemType = type(itemCls);
 
             while (it.hasNext() || cur != NULL) {
                 if (cur == NULL) {
                     cur = it.next();
 
-                    if (cur != null && itemType == Type.MSG) {
-                        cur = ((MessageAdapter)cur).clone();
-
-                        ((MessageAdapter)cur).setWriter(writer);
-                    }
+//                    if (cur != null && itemType == Type.MSG) {
+//                        cur = ((MessageAdapter)cur).clone();
+//
+//                        ((MessageAdapter)cur).setWriter(writer);
+//                    }
                 }
 
-                write(itemType, cur);
+                write(itemType, cur, state);
 
                 if (!lastFinished)
                     return;
@@ -709,6 +726,8 @@ public class DirectByteBufferStream {
                 it = map.entrySet().iterator();
             }
 
+            MessageWriteState state = MessageWriteState.get();
+
             Type keyType = type(keyCls);
             Type valType = type(valCls);
 
@@ -720,30 +739,30 @@ public class DirectByteBufferStream {
 
                     e = (Map.Entry<K, V>)cur;
 
-                    if (keyType == Type.MSG || valType == Type.MSG) {
-                        K k = e.getKey();
-                        V v = e.getValue();
-
-                        if (k != null && keyType == Type.MSG) {
-                            k = (K)((MessageAdapter)k).clone();
-
-                            ((MessageAdapter)k).setWriter(writer);
-                        }
-
-                        if (v != null && valType == Type.MSG) {
-                            v = (V)((MessageAdapter)v).clone();
-
-                            ((MessageAdapter)v).setWriter(writer);
-                        }
-
-                        cur = e = F.t(k, v);
-                    }
+//                    if (keyType == Type.MSG || valType == Type.MSG) {
+//                        K k = e.getKey();
+//                        V v = e.getValue();
+//
+//                        if (k != null && keyType == Type.MSG) {
+//                            k = (K)((MessageAdapter)k).clone();
+//
+//                            ((MessageAdapter)k).setWriter(writer);
+//                        }
+//
+//                        if (v != null && valType == Type.MSG) {
+//                            v = (V)((MessageAdapter)v).clone();
+//
+//                            ((MessageAdapter)v).setWriter(writer);
+//                        }
+//
+//                        cur = e = F.t(k, v);
+//                    }
                 }
                 else
                     e = (Map.Entry<K, V>)cur;
 
                 if (!keyDone) {
-                    write(keyType, e.getKey());
+                    write(keyType, e.getKey(), state);
 
                     if (!lastFinished)
                         return;
@@ -751,7 +770,7 @@ public class DirectByteBufferStream {
                     keyDone = true;
                 }
 
-                write(valType, e.getValue());
+                write(valType, e.getValue(), state);
 
                 if (!lastFinished)
                     return;
@@ -1343,7 +1362,7 @@ public class DirectByteBufferStream {
      * @param type Type.
      * @param val Value.
      */
-    private void write(Type type, Object val) {
+    private void write(Type type, Object val, MessageWriteState state) {
         switch (type) {
             case BYTE:
                 writeByte((Byte)val);
@@ -1446,7 +1465,16 @@ public class DirectByteBufferStream {
                 break;
 
             case MSG:
-                writeMessage((MessageAdapter)val);
+                try {
+                    if (val != null)
+                        state.forward();
+
+                    writeMessage((MessageAdapter)val);
+                }
+                finally {
+                    if (val != null)
+                        state.backward(lastFinished);
+                }
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 7526fac..ad7cf10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -185,8 +185,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeMessage(String name, @Nullable 
MessageAdapter msg) {
-        if (msg != null)
-            msg.setWriter(this);
+//        if (msg != null)
+//            msg.setWriter(this);
 
         stream.writeMessage(msg);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index e53691d..e201293 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -900,6 +900,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         assert msg != null;
         assert plc != null;
 
+        GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, 
ordered, timeout, skipOnTimeout);
+
         if (locNodeId.equals(node.id())) {
             assert plc != P2P_POOL;
 
@@ -908,16 +910,12 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             if (commLsnr == null)
                 throw new IgniteCheckedException("Trying to send message when 
grid is not fully started.");
 
-            GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, 
ordered, timeout, skipOnTimeout);
-
             if (ordered)
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
             else
                 processRegularMessage0(ioMsg, locNodeId);
         }
         else {
-            GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, 
msg.clone(), ordered, timeout, skipOnTimeout);
-
             if (topicOrd < 0)
                 ioMsg.topicBytes(marsh.marshal(topic));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 388c38f..4f17a20 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -152,7 +152,7 @@ public class IpcToNioAdapter<T> {
         assert writeBuf.hasArray();
 
         try {
-            msg.setWriter(formatter.writer());
+//            MessageAdapter.WRITER.set(formatter.writer());
 
             int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf);
 
@@ -161,6 +161,9 @@ public class IpcToNioAdapter<T> {
         catch (IOException | IgniteCheckedException e) {
             return new GridNioFinishedFuture<Object>(e);
         }
+        finally {
+//            MessageAdapter.WRITER.remove();
+        }
 
         return new GridNioFinishedFuture<>((Object)null);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 725f5e7..8f0366a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1000,13 +1000,12 @@ public class GridNioServer<T> {
                 ByteBuffer buf = ses.writeBuffer();
                 NioOperationFuture<?> req = 
ses.removeMeta(NIO_OPERATION.ordinal());
 
-                MessageWriter writer = ses.meta(WRITER.ordinal());
+                MessageWriteState state = ses.meta(WRITE_STATE.ordinal());
 
-                if (writer == null) {
-                    ses.addMeta(WRITER.ordinal(), writer = formatter.writer());
-
-                    MessageAdapter.WRITER.set(writer);
-                }
+                if (state == null)
+                    ses.addMeta(WRITE_STATE.ordinal(), state = 
MessageWriteState.create(formatter));
+                else
+                    MessageWriteState.set(state);
 
                 List<NioOperationFuture<?>> doneFuts = null;
 
@@ -1032,7 +1031,7 @@ public class GridNioServer<T> {
                         finished = msg.writeTo(buf);
 
                         if (finished)
-                            writer.reset();
+                            state.reset();
                     }
 
                     // Fill up as many messages as possible to write buffer.
@@ -1054,7 +1053,7 @@ public class GridNioServer<T> {
                         finished = msg.writeTo(buf);
 
                         if (finished)
-                            writer.reset();
+                            state.reset();
                     }
 
                     buf.flip();
@@ -1101,7 +1100,7 @@ public class GridNioServer<T> {
                 }
             }
             finally {
-                MessageAdapter.WRITER.remove();
+                MessageWriteState.clear();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index ba568ac..ab44f5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -41,8 +41,8 @@ public enum GridNioSessionMetaKey {
     /** Client marshaller ID. */
     MARSHALLER_ID,
 
-    /** Message writer. */
-    WRITER;
+    /** Message write state. */
+    WRITE_STATE;
 
     /** Maximum count of NIO session keys in system. */
     public static final int MAX_KEYS_CNT = 64;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 2add325..d81ba8e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -116,7 +116,7 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
         assert writeBuf.hasArray();
 
         try {
-            msg.setWriter(formatter.writer());
+//            MessageAdapter.WRITER.set(formatter.writer());
 
             int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf);
 
@@ -125,6 +125,9 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
         catch (IOException e) {
             throw new IgniteCheckedException("Failed to send message to remote 
node: " + shmem, e);
         }
+        finally {
+//            MessageAdapter.WRITER.remove();
+        }
 
         markUsed();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
index fd2aeb9..e3e93bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
@@ -197,7 +197,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
         assert writeBuf.hasArray();
 
         try {
-            msg.setWriter(formatter.writer());
+//            msg.setWriter(formatter.writer());
 
             int cnt = U.writeMessageFully(msg, out, writeBuf);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
index 466dfde..73db3ec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java
@@ -24,9 +24,6 @@ import java.nio.*;
  * Base class for all communication messages.
  */
 public abstract class MessageAdapter implements Serializable, Cloneable {
-    /** Message writer. */
-    public static final ThreadLocal<MessageWriter> WRITER = new 
ThreadLocal<>();
-
     // TODO: remove
     protected MessageWriter writer;
 
@@ -39,6 +36,9 @@ public abstract class MessageAdapter implements Serializable, 
Cloneable {
     /** Current write/read state. */
     protected int state;
 
+    /** Current read state. */
+    protected int readState;
+
 //    /**
 //     * @param writer Message writer.
 //     */
@@ -52,9 +52,8 @@ public abstract class MessageAdapter implements Serializable, 
Cloneable {
      * @param reader Message reader.
      */
     public final void setReader(MessageReader reader) {
-        assert reader != null;
-
         assert this.reader == null;
+        assert reader != null;
 
         this.reader = reader;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java
new file mode 100644
index 0000000..8163241
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+/**
+ * TODO
+ */
+public final class MessageWriteState {
+    public static final ThreadLocal<MessageWriteState> WRITE_STATE = new 
ThreadLocal<>();
+
+    public static MessageWriteState create(MessageFormatter formatter) {
+        MessageWriteState state = new MessageWriteState(formatter.writer());
+
+        WRITE_STATE.set(state);
+
+        return state;
+    }
+
+    public static void set(MessageWriteState state) {
+        assert state != null;
+
+        WRITE_STATE.set(state);
+    }
+
+    public static MessageWriteState get() {
+        MessageWriteState state = WRITE_STATE.get();
+
+        assert state != null;
+
+        return state;
+    }
+
+    public static void clear() {
+        WRITE_STATE.remove();
+    }
+
+    private final MessageWriter writer;
+
+    private final Stack stack;
+
+    private MessageWriteState(MessageWriter writer) {
+        this.writer = writer;
+
+        stack = new Stack(-1);
+    }
+
+    public MessageWriter writer() {
+        return writer;
+    }
+
+    public boolean isTypeWritten() {
+        return stack.current() >= 0;
+    }
+
+    public void setTypeWritten() {
+        assert stack.current() == -1;
+
+        stack.setCurrent(0);
+    }
+
+    public int index() {
+        return stack.current();
+    }
+
+    public void increment() {
+        stack.incrementCurrent();
+    }
+
+    public void forward() {
+        stack.push();
+    }
+
+    public void backward(boolean finished) {
+        if (finished)
+            stack.resetCurrent();
+
+        stack.pop();
+    }
+
+    public void reset() {
+        stack.reset();
+    }
+
+    private static class Stack {
+        private final int[] arr = new int[32];
+
+        private final int initVal;
+
+        private int pos;
+
+        private Stack(int initVal) {
+            this.initVal = initVal;
+
+            for (int i = 0; i < arr.length; i++)
+                arr[i] = initVal;
+        }
+
+        int current() {
+            return arr[pos];
+        }
+
+        void incrementCurrent() {
+            arr[pos]++;
+        }
+
+        void setCurrent(int val) {
+            arr[pos] = val;
+        }
+
+        void resetCurrent() {
+            arr[pos] = initVal;
+        }
+
+        void push() {
+            pos++;
+        }
+
+        void pop() {
+            pos--;
+        }
+
+        void reset() {
+            assert pos == 0;
+
+            resetCurrent();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 66eb721..89c55a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -263,6 +263,4 @@ public interface MessageWriter {
      * @return Whether value was fully written.
      */
     public <K, V> boolean writeMap(String name, Map<K, V> map, Class<K> 
keyCls, Class<V> valCls);
-
-    public void reset();
 }

Reply via email to