Repository: incubator-ignite Updated Branches: refs/heads/ignite-nio e66c060ed -> 43ba8317e
ignite-nio - Removing message clone Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/43ba8317 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/43ba8317 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/43ba8317 Branch: refs/heads/ignite-nio Commit: 43ba8317ec604602e7d464969249f1483b221e70 Parents: e66c060 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri Feb 13 22:06:06 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri Feb 13 22:06:06 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 21 ++++++-------------- .../internal/direct/DirectMessageWriter.java | 14 +++++++++---- .../communication/MessageWriteState.java | 17 +++++++++------- 3 files changed, 26 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43ba8317/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index 6fb6667..68eb4ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -21,7 +21,6 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; import sun.misc.*; import sun.nio.ch.*; @@ -317,7 +316,7 @@ public class DirectByteBufferStream { /** * @param msgFactory Message factory. */ - public DirectByteBufferStream(@Nullable MessageFactory msgFactory) { + public DirectByteBufferStream(MessageFactory msgFactory) { this.msgFactory = msgFactory; } @@ -597,11 +596,9 @@ public class DirectByteBufferStream { /** * @param msg Message. */ - public void writeMessage(MessageAdapter msg) { + public void writeMessage(MessageAdapter msg, MessageWriteState state) { if (msg != null) { if (buf.hasRemaining()) { - MessageWriteState state = MessageWriteState.get(); - try { state.forward(); @@ -623,7 +620,7 @@ public class DirectByteBufferStream { * @param itemCls Component type. * @param writer Writer. */ - public <T> void writeObjectArray(T[] arr, Class<T> itemCls, MessageWriter writer) { + public <T> void writeObjectArray(T[] arr, Class<T> itemCls, MessageWriteState state) { if (arr != null) { if (it == null) { writeInt(arr.length); @@ -634,8 +631,6 @@ public class DirectByteBufferStream { it = arrayIterator(arr); } - MessageWriteState state = MessageWriteState.get(); - Type itemType = type(itemCls); while (it.hasNext() || cur != NULL) { @@ -668,7 +663,7 @@ public class DirectByteBufferStream { * @param itemCls Item type. * @param writer Writer. */ - public <T> void writeCollection(Collection<T> col, Class<T> itemCls, MessageWriter writer) { + public <T> void writeCollection(Collection<T> col, Class<T> itemCls, MessageWriteState state) { if (col != null) { if (it == null) { writeInt(col.size()); @@ -679,8 +674,6 @@ public class DirectByteBufferStream { it = col.iterator(); } - MessageWriteState state = MessageWriteState.get(); - Type itemType = type(itemCls); while (it.hasNext() || cur != NULL) { @@ -715,7 +708,7 @@ public class DirectByteBufferStream { * @param writer Writer. */ @SuppressWarnings("unchecked") - public <K, V> void writeMap(Map<K, V> map, Class<K> keyCls, Class<V> valCls, MessageWriter writer) { + public <K, V> void writeMap(Map<K, V> map, Class<K> keyCls, Class<V> valCls, MessageWriteState state) { if (map != null) { if (it == null) { writeInt(map.size()); @@ -726,8 +719,6 @@ public class DirectByteBufferStream { it = map.entrySet().iterator(); } - MessageWriteState state = MessageWriteState.get(); - Type keyType = type(keyCls); Type valType = type(valCls); @@ -1469,7 +1460,7 @@ public class DirectByteBufferStream { if (val != null) state.forward(); - writeMessage((MessageAdapter)val); + writeMessage((MessageAdapter)val, state); } finally { if (val != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43ba8317/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index ad7cf10..de6179b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -31,6 +31,12 @@ public class DirectMessageWriter implements MessageWriter { /** Stream. */ private final DirectByteBufferStream stream = new DirectByteBufferStream(null); + private MessageWriteState state; + + public void state(MessageWriteState state) { + this.state = state; + } + /** {@inheritDoc} */ @Override public void setBuffer(ByteBuffer buf) { stream.setBuffer(buf); @@ -188,28 +194,28 @@ public class DirectMessageWriter implements MessageWriter { // if (msg != null) // msg.setWriter(this); - stream.writeMessage(msg); + stream.writeMessage(msg, state); return stream.lastFinished(); } /** {@inheritDoc} */ @Override public <T> boolean writeObjectArray(String name, T[] arr, Class<T> itemCls) { - stream.writeObjectArray(arr, itemCls, this); + stream.writeObjectArray(arr, itemCls, state); return stream.lastFinished(); } /** {@inheritDoc} */ @Override public <T> boolean writeCollection(String name, Collection<T> col, Class<T> itemCls) { - stream.writeCollection(col, itemCls, this); + stream.writeCollection(col, itemCls, state); return stream.lastFinished(); } /** {@inheritDoc} */ @Override public <K, V> boolean writeMap(String name, Map<K, V> map, Class<K> keyCls, Class<V> valCls) { - stream.writeMap(map, keyCls, valCls, this); + stream.writeMap(map, keyCls, valCls, state); return stream.lastFinished(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43ba8317/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java index 8163241..10b50bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java @@ -17,6 +17,8 @@ package org.apache.ignite.plugin.extensions.communication; +import org.apache.ignite.internal.direct.*; + /** * TODO */ @@ -24,7 +26,12 @@ public final class MessageWriteState { public static final ThreadLocal<MessageWriteState> WRITE_STATE = new ThreadLocal<>(); public static MessageWriteState create(MessageFormatter formatter) { - MessageWriteState state = new MessageWriteState(formatter.writer()); + MessageWriter writer = formatter.writer(); + + MessageWriteState state = new MessageWriteState(writer); + + // TODO: rework + ((DirectMessageWriter)writer).state(state); WRITE_STATE.set(state); @@ -70,7 +77,7 @@ public final class MessageWriteState { public void setTypeWritten() { assert stack.current() == -1; - stack.setCurrent(0); + stack.incrementCurrent(); } public int index() { @@ -97,7 +104,7 @@ public final class MessageWriteState { } private static class Stack { - private final int[] arr = new int[32]; + private final int[] arr = new int[10]; private final int initVal; @@ -118,10 +125,6 @@ public final class MessageWriteState { arr[pos]++; } - void setCurrent(int val) { - arr[pos] = val; - } - void resetCurrent() { arr[pos] = initVal; }