ignite-nio - Removing message clone
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/82bca8ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/82bca8ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/82bca8ea Branch: refs/heads/ignite-nio Commit: 82bca8ea42635a636876c508164eec517a952a33 Parents: 7e6b8df Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri Feb 13 18:12:54 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri Feb 13 18:12:54 2015 -0800 ---------------------------------------------------------------------- .../CommunicationMessageCodeGenerator.java | 19 +-- .../internal/direct/DirectByteBufferStream.java | 102 ++++++++----- .../internal/direct/DirectMessageWriter.java | 4 +- .../managers/communication/GridIoManager.java | 6 +- .../internal/util/ipc/IpcToNioAdapter.java | 5 +- .../ignite/internal/util/nio/GridNioServer.java | 17 ++- .../util/nio/GridNioSessionMetaKey.java | 4 +- .../util/nio/GridShmemCommunicationClient.java | 5 +- .../util/nio/GridTcpCommunicationClient.java | 2 +- .../communication/MessageAdapter.java | 9 +- .../communication/MessageWriteState.java | 143 +++++++++++++++++++ .../extensions/communication/MessageWriter.java | 2 - 12 files changed, 242 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java index 54d4a30..13a50bf 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java @@ -63,12 +63,6 @@ public class CommunicationMessageCodeGenerator { private static final String BUF_VAR = "buf"; /** */ - private static final String STATE_VAR = "state"; - - /** */ - private static final String TYPE_WRITTEN_VAR = "typeWritten"; - - /** */ private final Collection<String> write = new ArrayList<>(); /** */ @@ -358,7 +352,8 @@ public class CommunicationMessageCodeGenerator { assert code != null; if (write) { - code.add(builder().a("MessageWriter writer = WRITER.get();").toString()); + code.add(builder().a("MessageWriteState state = MessageWriteState.get();").toString()); + code.add(builder().a("MessageWriter writer = state.writer();").toString()); code.add(EMPTY); } @@ -372,14 +367,14 @@ public class CommunicationMessageCodeGenerator { } if (write) { - code.add(builder().a("if (!").a(TYPE_WRITTEN_VAR).a(") {").toString()); + code.add(builder().a("if (!state.isTypeWritten()) {").toString()); indent++; returnFalseIfFailed(code, "writer.writeByte", "null", "directType()"); code.add(EMPTY); - code.add(builder().a(TYPE_WRITTEN_VAR).a(" = true;").toString()); + code.add(builder().a("state.setTypeWritten();").toString()); indent--; @@ -388,7 +383,7 @@ public class CommunicationMessageCodeGenerator { } if (!fields.isEmpty()) - code.add(builder().a("switch (").a(STATE_VAR).a(") {").toString()); + code.add(builder().a("switch (state.index()) {").toString()); } /** @@ -446,7 +441,7 @@ public class CommunicationMessageCodeGenerator { mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? mapAnn.valueType() : null, false); write.add(EMPTY); - write.add(builder().a(STATE_VAR).a("++;").toString()); + write.add(builder().a("state.increment();").toString()); write.add(EMPTY); indent--; @@ -471,7 +466,7 @@ public class CommunicationMessageCodeGenerator { mapAnn != null ? mapAnn.keyType() : null, mapAnn != null ? mapAnn.valueType() : null); read.add(EMPTY); - read.add(builder().a(STATE_VAR).a("++;").toString()); + read.add(builder().a("readState++;").toString()); read.add(EMPTY); indent--; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index f74da65..6fb6667 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.direct; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -599,8 +598,22 @@ public class DirectByteBufferStream { * @param msg Message. */ public void writeMessage(MessageAdapter msg) { - if (msg != null) - lastFinished = buf.hasRemaining() && msg.writeTo(buf); + if (msg != null) { + if (buf.hasRemaining()) { + MessageWriteState state = MessageWriteState.get(); + + try { + state.forward(); + + lastFinished = msg.writeTo(buf); + } + finally { + state.backward(lastFinished); + } + } + else + lastFinished = false; + } else writeByte(Byte.MIN_VALUE); } @@ -621,20 +634,22 @@ public class DirectByteBufferStream { it = arrayIterator(arr); } + MessageWriteState state = MessageWriteState.get(); + Type itemType = type(itemCls); while (it.hasNext() || cur != NULL) { if (cur == NULL) { cur = it.next(); - if (cur != null && itemType == Type.MSG) { - cur = ((MessageAdapter)cur).clone(); - - ((MessageAdapter)cur).setWriter(writer); - } +// if (cur != null && itemType == Type.MSG) { +// cur = ((MessageAdapter)cur).clone(); +// +// ((MessageAdapter)cur).setWriter(writer); +// } } - write(itemType, cur); + write(itemType, cur, state); if (!lastFinished) return; @@ -664,20 +679,22 @@ public class DirectByteBufferStream { it = col.iterator(); } + MessageWriteState state = MessageWriteState.get(); + Type itemType = type(itemCls); while (it.hasNext() || cur != NULL) { if (cur == NULL) { cur = it.next(); - if (cur != null && itemType == Type.MSG) { - cur = ((MessageAdapter)cur).clone(); - - ((MessageAdapter)cur).setWriter(writer); - } +// if (cur != null && itemType == Type.MSG) { +// cur = ((MessageAdapter)cur).clone(); +// +// ((MessageAdapter)cur).setWriter(writer); +// } } - write(itemType, cur); + write(itemType, cur, state); if (!lastFinished) return; @@ -709,6 +726,8 @@ public class DirectByteBufferStream { it = map.entrySet().iterator(); } + MessageWriteState state = MessageWriteState.get(); + Type keyType = type(keyCls); Type valType = type(valCls); @@ -720,30 +739,30 @@ public class DirectByteBufferStream { e = (Map.Entry<K, V>)cur; - if (keyType == Type.MSG || valType == Type.MSG) { - K k = e.getKey(); - V v = e.getValue(); - - if (k != null && keyType == Type.MSG) { - k = (K)((MessageAdapter)k).clone(); - - ((MessageAdapter)k).setWriter(writer); - } - - if (v != null && valType == Type.MSG) { - v = (V)((MessageAdapter)v).clone(); - - ((MessageAdapter)v).setWriter(writer); - } - - cur = e = F.t(k, v); - } +// if (keyType == Type.MSG || valType == Type.MSG) { +// K k = e.getKey(); +// V v = e.getValue(); +// +// if (k != null && keyType == Type.MSG) { +// k = (K)((MessageAdapter)k).clone(); +// +// ((MessageAdapter)k).setWriter(writer); +// } +// +// if (v != null && valType == Type.MSG) { +// v = (V)((MessageAdapter)v).clone(); +// +// ((MessageAdapter)v).setWriter(writer); +// } +// +// cur = e = F.t(k, v); +// } } else e = (Map.Entry<K, V>)cur; if (!keyDone) { - write(keyType, e.getKey()); + write(keyType, e.getKey(), state); if (!lastFinished) return; @@ -751,7 +770,7 @@ public class DirectByteBufferStream { keyDone = true; } - write(valType, e.getValue()); + write(valType, e.getValue(), state); if (!lastFinished) return; @@ -1343,7 +1362,7 @@ public class DirectByteBufferStream { * @param type Type. * @param val Value. */ - private void write(Type type, Object val) { + private void write(Type type, Object val, MessageWriteState state) { switch (type) { case BYTE: writeByte((Byte)val); @@ -1446,7 +1465,16 @@ public class DirectByteBufferStream { break; case MSG: - writeMessage((MessageAdapter)val); + try { + if (val != null) + state.forward(); + + writeMessage((MessageAdapter)val); + } + finally { + if (val != null) + state.backward(lastFinished); + } break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 7526fac..ad7cf10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -185,8 +185,8 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeMessage(String name, @Nullable MessageAdapter msg) { - if (msg != null) - msg.setWriter(this); +// if (msg != null) +// msg.setWriter(this); stream.writeMessage(msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index e53691d..e201293 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -900,6 +900,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa assert msg != null; assert plc != null; + GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); + if (locNodeId.equals(node.id())) { assert plc != P2P_POOL; @@ -908,16 +910,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (commLsnr == null) throw new IgniteCheckedException("Trying to send message when grid is not fully started."); - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); - if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); } else { - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg.clone(), ordered, timeout, skipOnTimeout); - if (topicOrd < 0) ioMsg.topicBytes(marsh.marshal(topic)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index 388c38f..4f17a20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -152,7 +152,7 @@ public class IpcToNioAdapter<T> { assert writeBuf.hasArray(); try { - msg.setWriter(formatter.writer()); +// MessageAdapter.WRITER.set(formatter.writer()); int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf); @@ -161,6 +161,9 @@ public class IpcToNioAdapter<T> { catch (IOException | IgniteCheckedException e) { return new GridNioFinishedFuture<Object>(e); } + finally { +// MessageAdapter.WRITER.remove(); + } return new GridNioFinishedFuture<>((Object)null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 725f5e7..8f0366a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1000,13 +1000,12 @@ public class GridNioServer<T> { ByteBuffer buf = ses.writeBuffer(); NioOperationFuture<?> req = ses.removeMeta(NIO_OPERATION.ordinal()); - MessageWriter writer = ses.meta(WRITER.ordinal()); + MessageWriteState state = ses.meta(WRITE_STATE.ordinal()); - if (writer == null) { - ses.addMeta(WRITER.ordinal(), writer = formatter.writer()); - - MessageAdapter.WRITER.set(writer); - } + if (state == null) + ses.addMeta(WRITE_STATE.ordinal(), state = MessageWriteState.create(formatter)); + else + MessageWriteState.set(state); List<NioOperationFuture<?>> doneFuts = null; @@ -1032,7 +1031,7 @@ public class GridNioServer<T> { finished = msg.writeTo(buf); if (finished) - writer.reset(); + state.reset(); } // Fill up as many messages as possible to write buffer. @@ -1054,7 +1053,7 @@ public class GridNioServer<T> { finished = msg.writeTo(buf); if (finished) - writer.reset(); + state.reset(); } buf.flip(); @@ -1101,7 +1100,7 @@ public class GridNioServer<T> { } } finally { - MessageAdapter.WRITER.remove(); + MessageWriteState.clear(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java index ba568ac..ab44f5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java @@ -41,8 +41,8 @@ public enum GridNioSessionMetaKey { /** Client marshaller ID. */ MARSHALLER_ID, - /** Message writer. */ - WRITER; + /** Message write state. */ + WRITE_STATE; /** Maximum count of NIO session keys in system. */ public static final int MAX_KEYS_CNT = 64; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 2add325..d81ba8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -116,7 +116,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien assert writeBuf.hasArray(); try { - msg.setWriter(formatter.writer()); +// MessageAdapter.WRITER.set(formatter.writer()); int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf); @@ -125,6 +125,9 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien catch (IOException e) { throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); } + finally { +// MessageAdapter.WRITER.remove(); + } markUsed(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java index fd2aeb9..e3e93bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java @@ -197,7 +197,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient assert writeBuf.hasArray(); try { - msg.setWriter(formatter.writer()); +// msg.setWriter(formatter.writer()); int cnt = U.writeMessageFully(msg, out, writeBuf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java index 466dfde..73db3ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java @@ -24,9 +24,6 @@ import java.nio.*; * Base class for all communication messages. */ public abstract class MessageAdapter implements Serializable, Cloneable { - /** Message writer. */ - public static final ThreadLocal<MessageWriter> WRITER = new ThreadLocal<>(); - // TODO: remove protected MessageWriter writer; @@ -39,6 +36,9 @@ public abstract class MessageAdapter implements Serializable, Cloneable { /** Current write/read state. */ protected int state; + /** Current read state. */ + protected int readState; + // /** // * @param writer Message writer. // */ @@ -52,9 +52,8 @@ public abstract class MessageAdapter implements Serializable, Cloneable { * @param reader Message reader. */ public final void setReader(MessageReader reader) { - assert reader != null; - assert this.reader == null; + assert reader != null; this.reader = reader; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java new file mode 100644 index 0000000..8163241 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriteState.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** + * TODO + */ +public final class MessageWriteState { + public static final ThreadLocal<MessageWriteState> WRITE_STATE = new ThreadLocal<>(); + + public static MessageWriteState create(MessageFormatter formatter) { + MessageWriteState state = new MessageWriteState(formatter.writer()); + + WRITE_STATE.set(state); + + return state; + } + + public static void set(MessageWriteState state) { + assert state != null; + + WRITE_STATE.set(state); + } + + public static MessageWriteState get() { + MessageWriteState state = WRITE_STATE.get(); + + assert state != null; + + return state; + } + + public static void clear() { + WRITE_STATE.remove(); + } + + private final MessageWriter writer; + + private final Stack stack; + + private MessageWriteState(MessageWriter writer) { + this.writer = writer; + + stack = new Stack(-1); + } + + public MessageWriter writer() { + return writer; + } + + public boolean isTypeWritten() { + return stack.current() >= 0; + } + + public void setTypeWritten() { + assert stack.current() == -1; + + stack.setCurrent(0); + } + + public int index() { + return stack.current(); + } + + public void increment() { + stack.incrementCurrent(); + } + + public void forward() { + stack.push(); + } + + public void backward(boolean finished) { + if (finished) + stack.resetCurrent(); + + stack.pop(); + } + + public void reset() { + stack.reset(); + } + + private static class Stack { + private final int[] arr = new int[32]; + + private final int initVal; + + private int pos; + + private Stack(int initVal) { + this.initVal = initVal; + + for (int i = 0; i < arr.length; i++) + arr[i] = initVal; + } + + int current() { + return arr[pos]; + } + + void incrementCurrent() { + arr[pos]++; + } + + void setCurrent(int val) { + arr[pos] = val; + } + + void resetCurrent() { + arr[pos] = initVal; + } + + void push() { + pos++; + } + + void pop() { + pos--; + } + + void reset() { + assert pos == 0; + + resetCurrent(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82bca8ea/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index 66eb721..89c55a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -263,6 +263,4 @@ public interface MessageWriter { * @return Whether value was fully written. */ public <K, V> boolean writeMap(String name, Map<K, V> map, Class<K> keyCls, Class<V> valCls); - - public void reset(); }