ignite-nio - Removing message clone
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f06c0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f06c0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f06c0e Branch: refs/heads/ignite-nio Commit: f9f06c0e5c085687a6ecd5957e05883b4ec42492 Parents: ca27b58 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sat Feb 14 19:08:44 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sat Feb 14 19:08:44 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 28 +++++++------------- .../internal/direct/DirectMessageReader.java | 8 +++--- .../managers/communication/GridIoManager.java | 2 +- .../communication/GridIoMessageFactory.java | 11 +++++++- .../internal/util/nio/GridDirectParser.java | 13 +-------- .../testframework/GridSpiTestContext.java | 2 +- 6 files changed, 27 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f06c0e/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index 33e277c..f503223 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -936,7 +936,7 @@ public class DirectByteBufferStream { * @return Message. */ @SuppressWarnings("unchecked") - public <T extends MessageAdapter> T readMessage(MessageReader reader) { + public <T extends MessageAdapter> T readMessage() { if (!msgTypeDone) { if (!buf.hasRemaining()) { lastFinished = false; @@ -948,9 +948,6 @@ public class DirectByteBufferStream { msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); - if (msg != null) - msg.setReader(reader); - msgTypeDone = true; } @@ -970,11 +967,10 @@ public class DirectByteBufferStream { /** * @param itemType Component type. - * @param reader Reader. * @return Array. */ @SuppressWarnings("unchecked") - public <T> T[] readObjectArray(MessageAdapter.Type itemType, MessageReader reader) { + public <T> T[] readObjectArray(MessageAdapter.Type itemType) { if (readSize == -1) { int size = readInt(); @@ -989,7 +985,7 @@ public class DirectByteBufferStream { objArr = (Object[])Array.newInstance(itemType.clazz(), readSize); for (int i = readItems; i < readSize; i++) { - Object item = read(itemType, reader); + Object item = read(itemType); if (!lastFinished) return null; @@ -1013,11 +1009,10 @@ public class DirectByteBufferStream { /** * @param itemType Item type. - * @param reader Reader. * @return Collection. */ @SuppressWarnings("unchecked") - public <C extends Collection<?>> C readCollection(MessageAdapter.Type itemType, MessageReader reader) { + public <C extends Collection<?>> C readCollection(MessageAdapter.Type itemType) { if (readSize == -1) { int size = readInt(); @@ -1032,7 +1027,7 @@ public class DirectByteBufferStream { col = new ArrayList<>(readSize); for (int i = readItems; i < readSize; i++) { - Object item = read(itemType, reader); + Object item = read(itemType); if (!lastFinished) return null; @@ -1057,13 +1052,11 @@ public class DirectByteBufferStream { /** * @param keyType Key type. * @param valType Value type. - * @param reader Reader. * @param linked Whether linked map should be created. * @return Map. */ @SuppressWarnings("unchecked") - public <M extends Map<?, ?>> M readMap(MessageAdapter.Type keyType, MessageAdapter.Type valType, - MessageReader reader, boolean linked) { + public <M extends Map<?, ?>> M readMap(MessageAdapter.Type keyType, MessageAdapter.Type valType, boolean linked) { if (readSize == -1) { int size = readInt(); @@ -1079,7 +1072,7 @@ public class DirectByteBufferStream { for (int i = readItems; i < readSize; i++) { if (!keyDone) { - Object key = read(keyType, reader); + Object key = read(keyType); if (!lastFinished) return null; @@ -1088,7 +1081,7 @@ public class DirectByteBufferStream { keyDone = true; } - Object val = read(valType, reader); + Object val = read(valType); if (!lastFinished) return null; @@ -1384,10 +1377,9 @@ public class DirectByteBufferStream { /** * @param type Type. - * @param reader Reader. * @return Value. */ - private Object read(MessageAdapter.Type type, MessageReader reader) { + private Object read(MessageAdapter.Type type) { switch (type) { case BYTE: return readByte(); @@ -1450,7 +1442,7 @@ public class DirectByteBufferStream { return readIgniteUuid(); case MSG: - return readMessage(reader); + return readMessage(); default: throw new IllegalArgumentException("Unknown type: " + type); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f06c0e/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index b167122..191c522 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -228,7 +228,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public <T extends MessageAdapter> T readMessage(String name) { - T msg = stream.readMessage(this); + T msg = stream.readMessage(); lastRead = stream.lastFinished(); @@ -237,7 +237,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <T> T[] readObjectArray(String name, MessageAdapter.Type itemType) { - T[] msg = stream.readObjectArray(itemType, this); + T[] msg = stream.readObjectArray(itemType); lastRead = stream.lastFinished(); @@ -246,7 +246,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <C extends Collection<?>> C readCollection(String name, MessageAdapter.Type itemType) { - C col = stream.readCollection(itemType, this); + C col = stream.readCollection(itemType); lastRead = stream.lastFinished(); @@ -256,7 +256,7 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <M extends Map<?, ?>> M readMap(String name, MessageAdapter.Type keyType, MessageAdapter.Type valType, boolean linked) { - M map = stream.readMap(keyType, valType, this, linked); + M map = stream.readMap(keyType, valType, linked); lastRead = stream.lastFinished(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f06c0e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 7f2fdb8..e201293 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -229,7 +229,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; } - msgFactory = new GridIoMessageFactory(ctx.plugins().extensions(MessageFactory.class)); + msgFactory = new GridIoMessageFactory(formatter, ctx.plugins().extensions(MessageFactory.class)); if (log.isDebugEnabled()) log.debug(startInfo()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f06c0e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 4f3182f..56abef5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -52,13 +52,20 @@ public class GridIoMessageFactory implements MessageFactory { /** Custom messages registry. Used for test purposes. */ private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = new ConcurrentHashMap8<>(); + /** Message reader factory. */ + private final MessageFormatter formatter; + /** Extensions. */ private final MessageFactory[] ext; /** + * @param formatter Message formatter. * @param ext Extensions. */ - public GridIoMessageFactory(MessageFactory[] ext) { + public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] ext) { + assert formatter != null; + + this.formatter = formatter; this.ext = ext; } @@ -517,6 +524,8 @@ public class GridIoMessageFactory implements MessageFactory { if (msg == null) throw new IgniteException("Invalid message type: " + type); + msg.setReader(formatter.reader()); + return msg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f06c0e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 349045c..aad19f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -31,9 +31,6 @@ public class GridDirectParser implements GridNioParser { /** Message metadata key. */ private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - /** Message reader key. */ - private static final int MSG_READER_KEY = GridNioSessionMetaKey.nextUniqueKey(); - /** */ private final MessageFactory msgFactory; @@ -55,17 +52,9 @@ public class GridDirectParser implements GridNioParser { @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { MessageAdapter msg = ses.removeMeta(MSG_META_KEY); - if (msg == null && buf.hasRemaining()) { - MessageReader reader = ses.meta(MSG_READER_KEY); - - if (reader == null) - ses.addMeta(MSG_READER_KEY, reader = formatter.reader()); - + if (msg == null && buf.hasRemaining()) msg = msgFactory.create(buf.get()); - msg.setReader(reader); - } - boolean finished = false; if (buf.hasRemaining()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f06c0e/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index d8321b3..80b9b96 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -531,7 +531,7 @@ public class GridSpiTestContext implements IgniteSpiContext { /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new GridIoMessageFactory(null); + factory = new GridIoMessageFactory(messageFormatter(), null); return factory; }