ignite-nio - Removing message clone
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a3fe9339 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a3fe9339 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a3fe9339 Branch: refs/heads/sprint-1 Commit: a3fe93393a62a35cc72ed839e663b5dbb8676c65 Parents: e74ff00 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sat Feb 14 17:55:08 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sat Feb 14 17:55:08 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 46 ++++++++------------ .../internal/direct/DirectMessageReader.java | 8 ++-- .../managers/communication/GridIoManager.java | 2 +- .../communication/GridIoMessageFactory.java | 11 +---- .../internal/util/nio/GridDirectParser.java | 20 ++++++++- .../ignite/internal/util/nio/GridNioServer.java | 6 +++ .../communication/tcp/TcpCommunicationSpi.java | 9 +++- .../testframework/GridSpiTestContext.java | 2 +- 8 files changed, 56 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index e642790..1f2579f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -479,16 +479,6 @@ public class DirectByteBufferStream { } /** - * @param val Value. - */ - public void writeByteArray(byte[] val, int off, int len) { - if (val != null) - lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len, true); - else - writeInt(-1); - } - - /** * @param val Value */ public void writeShortArray(short[] val) { @@ -920,14 +910,6 @@ public class DirectByteBufferStream { } /** - * @param len Length. - * @return Value. - */ - public byte[] readByteArray(int len) { - return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF, len); - } - - /** /** * @return Value. */ @@ -1028,7 +1010,7 @@ public class DirectByteBufferStream { * @return Message. */ @SuppressWarnings("unchecked") - public <T extends MessageAdapter> T readMessage() { + public <T extends MessageAdapter> T readMessage(MessageReader reader) { if (!msgTypeDone) { if (!buf.hasRemaining()) { lastFinished = false; @@ -1040,6 +1022,9 @@ public class DirectByteBufferStream { msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); + if (msg != null) + msg.setReader(reader); + msgTypeDone = true; } @@ -1059,10 +1044,11 @@ public class DirectByteBufferStream { /** * @param itemCls Component type. + * @param reader Reader. * @return Array. */ @SuppressWarnings("unchecked") - public <T> T[] readObjectArray(Class<?> itemCls) { + public <T> T[] readObjectArray(Class<?> itemCls, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1079,7 +1065,7 @@ public class DirectByteBufferStream { Type itemType = type(itemCls); for (int i = readItems; i < readSize; i++) { - Object item = read(itemType); + Object item = read(itemType, reader); if (!lastFinished) return null; @@ -1103,10 +1089,11 @@ public class DirectByteBufferStream { /** * @param itemCls Item type. + * @param reader Reader. * @return Collection. */ @SuppressWarnings("unchecked") - public <C extends Collection<T>, T> C readCollection(Class<T> itemCls) { + public <C extends Collection<T>, T> C readCollection(Class<T> itemCls, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1123,7 +1110,7 @@ public class DirectByteBufferStream { Type itemType = type(itemCls); for (int i = readItems; i < readSize; i++) { - Object item = read(itemType); + Object item = read(itemType, reader); if (!lastFinished) return null; @@ -1148,11 +1135,13 @@ public class DirectByteBufferStream { /** * @param keyCls Key type. * @param valCls Value type. + * @param reader Reader. * @param linked Whether linked map should be created. * @return Map. */ @SuppressWarnings("unchecked") - public <M extends Map<K, V>, K, V> M readMap(Class<K> keyCls, Class<V> valCls, boolean linked) { + public <M extends Map<K, V>, K, V> M readMap(Class<K> keyCls, Class<V> valCls, MessageReader reader, + boolean linked) { if (readSize == -1) { int size = readInt(); @@ -1171,7 +1160,7 @@ public class DirectByteBufferStream { for (int i = readItems; i < readSize; i++) { if (!keyDone) { - Object key = read(keyType); + Object key = read(keyType, reader); if (!lastFinished) return null; @@ -1180,7 +1169,7 @@ public class DirectByteBufferStream { keyDone = true; } - Object val = read(valType); + Object val = read(valType, reader); if (!lastFinished) return null; @@ -1476,9 +1465,10 @@ public class DirectByteBufferStream { /** * @param type Type. + * @param reader Reader. * @return Value. */ - private Object read(Type type) { + private Object read(Type type, MessageReader reader) { switch (type) { case BYTE: return readByte(); @@ -1541,7 +1531,7 @@ public class DirectByteBufferStream { return readIgniteUuid(); case MSG: - return readMessage(); + return readMessage(reader); default: throw new IllegalArgumentException("Unknown type: " + type); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 48c79ce..9a3fb3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -237,7 +237,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public <T extends MessageAdapter> T readMessage(String name) { - T msg = stream.readMessage(); + T msg = stream.readMessage(this); lastRead = stream.lastFinished(); @@ -246,7 +246,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <T> T[] readObjectArray(String name, Class<T> itemCls) { - T[] msg = stream.readObjectArray(itemCls); + T[] msg = stream.readObjectArray(itemCls, this); lastRead = stream.lastFinished(); @@ -255,7 +255,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <C extends Collection<T>, T> C readCollection(String name, Class<T> itemCls) { - C col = stream.readCollection(itemCls); + C col = stream.readCollection(itemCls, this); lastRead = stream.lastFinished(); @@ -265,7 +265,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <M extends Map<K, V>, K, V> M readMap(String name, Class<K> keyCls, Class<V> valCls, boolean linked) { - M map = stream.readMap(keyCls, valCls, linked); + M map = stream.readMap(keyCls, valCls, this, linked); lastRead = stream.lastFinished(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index e201293..7f2fdb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -229,7 +229,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; } - msgFactory = new GridIoMessageFactory(formatter, ctx.plugins().extensions(MessageFactory.class)); + msgFactory = new GridIoMessageFactory(ctx.plugins().extensions(MessageFactory.class)); if (log.isDebugEnabled()) log.debug(startInfo()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 56abef5..4f3182f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -52,20 +52,13 @@ public class GridIoMessageFactory implements MessageFactory { /** Custom messages registry. Used for test purposes. */ private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = new ConcurrentHashMap8<>(); - /** Message reader factory. */ - private final MessageFormatter formatter; - /** Extensions. */ private final MessageFactory[] ext; /** - * @param formatter Message formatter. * @param ext Extensions. */ - public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] ext) { - assert formatter != null; - - this.formatter = formatter; + public GridIoMessageFactory(MessageFactory[] ext) { this.ext = ext; } @@ -524,8 +517,6 @@ public class GridIoMessageFactory implements MessageFactory { if (msg == null) throw new IgniteException("Invalid message type: " + type); - msg.setReader(formatter.reader()); - return msg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 3b00bd9..349045c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -31,25 +31,41 @@ public class GridDirectParser implements GridNioParser { /** Message metadata key. */ private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** Message reader key. */ + private static final int MSG_READER_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** */ private final MessageFactory msgFactory; + /** */ + private final MessageFormatter formatter; + /** * @param msgFactory Message factory. + * @param formatter Formatter. */ - public GridDirectParser(MessageFactory msgFactory) { + public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) { assert msgFactory != null; this.msgFactory = msgFactory; + this.formatter = formatter; } /** {@inheritDoc} */ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { MessageAdapter msg = ses.removeMeta(MSG_META_KEY); - if (msg == null && buf.hasRemaining()) + if (msg == null && buf.hasRemaining()) { + MessageReader reader = ses.meta(MSG_READER_KEY); + + if (reader == null) + ses.addMeta(MSG_READER_KEY, reader = formatter.reader()); + msg = msgFactory.create(buf.get()); + msg.setReader(reader); + } + boolean finished = false; if (buf.hasRemaining()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 61bb550..68515c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -889,6 +889,9 @@ public class GridNioServer<T> { assert msg != null; finished = msg.writeTo(buf, state); + + if (finished) + state.reset(); } // Fill up as many messages as possible to write buffer. @@ -908,6 +911,9 @@ public class GridNioServer<T> { assert msg != null; finished = msg.writeTo(buf, state); + + if (finished) + state.reset(); } buf.flip(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 193e4d7..eb118b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1504,6 +1504,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; + GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); + GridNioServer<MessageAdapter> srvr = GridNioServer.<MessageAdapter>builder() .address(locHost) @@ -1521,7 +1523,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .directMode(true) .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) - .filters(new GridNioCodecFilter(new GridDirectParser(messageFactory), log, true), + .filters(new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log)) .messageFormatter(messageFormatter) .build(); @@ -2452,13 +2454,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { try { + GridDirectParser parser = new GridDirectParser(getSpiContext().messageFactory(), + getSpiContext().messageFormatter()); + IpcToNioAdapter<MessageAdapter> adapter = new IpcToNioAdapter<>( metricsLsnr, log, endpoint, srvLsnr, getSpiContext().messageFormatter(), - new GridNioCodecFilter(new GridDirectParser(getSpiContext().messageFactory()), log, true), + new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log) ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3fe9339/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 80b9b96..d8321b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -531,7 +531,7 @@ public class GridSpiTestContext implements IgniteSpiContext { /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new GridIoMessageFactory(messageFormatter(), null); + factory = new GridIoMessageFactory(null); return factory; }