Repository: incubator-ignite Updated Branches: refs/heads/ignite-312 700067689 -> a31638115
# gg-9791 - Added MessageHeader Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0303b402 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0303b402 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0303b402 Branch: refs/heads/ignite-312 Commit: 0303b402aa37f3156c3f624c5c95b85b12a9be9f Parents: 3021ef8 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Feb 17 18:23:59 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Feb 17 18:23:59 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectMessageReader.java | 9 ++++ .../internal/direct/DirectMessageWriter.java | 7 +++ .../managers/communication/GridIoManager.java | 2 +- .../communication/GridIoMessageFactory.java | 11 +---- .../internal/util/nio/GridDirectParser.java | 26 +++++++++-- .../extensions/communication/MessageHeader.java | 49 ++++++++++++++++++++ .../extensions/communication/MessageReader.java | 7 +++ .../extensions/communication/MessageWriter.java | 7 +++ .../communication/tcp/TcpCommunicationSpi.java | 7 ++- .../testframework/GridSpiTestContext.java | 2 +- 10 files changed, 109 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index f52a4c8..b616ca8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -47,6 +47,15 @@ public class DirectMessageReader implements MessageReader { } /** {@inheritDoc} */ + @Override public MessageHeader readHeader() { + byte type = stream.readByte(); + + lastRead = stream.lastFinished(); + + return lastRead ? new MessageHeader(type, (byte)0) : null; + } + + /** {@inheritDoc} */ @Override public byte readByte(String name) { byte val = stream.readByte(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 238ecb6..d5ed8f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -40,6 +40,13 @@ public class DirectMessageWriter implements MessageWriter { } /** {@inheritDoc} */ + @Override public boolean writeHeader(MessageHeader header) { + stream.writeByte(header.messageType()); + + return stream.lastFinished(); + } + + /** {@inheritDoc} */ @Override public boolean writeByte(String name, byte val) { stream.writeByte(val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index e201293..7f2fdb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -229,7 +229,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; } - msgFactory = new GridIoMessageFactory(formatter, ctx.plugins().extensions(MessageFactory.class)); + msgFactory = new GridIoMessageFactory(ctx.plugins().extensions(MessageFactory.class)); if (log.isDebugEnabled()) log.debug(startInfo()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index e070571..ec74b7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -52,20 +52,13 @@ public class GridIoMessageFactory implements MessageFactory { /** Custom messages registry. Used for test purposes. */ private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = new ConcurrentHashMap8<>(); - /** Message reader factory. */ - private final MessageFormatter formatter; - /** Extensions. */ private final MessageFactory[] ext; /** - * @param formatter Message formatter. * @param ext Extensions. */ - public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] ext) { - assert formatter != null; - - this.formatter = formatter; + public GridIoMessageFactory(MessageFactory[] ext) { this.ext = ext; } @@ -514,8 +507,6 @@ public class GridIoMessageFactory implements MessageFactory { if (msg == null) throw new IgniteException("Invalid message type: " + type); - msg.setReader(formatter.reader()); - return msg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 3b00bd9..5e21dd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -34,21 +34,39 @@ public class GridDirectParser implements GridNioParser { /** */ private final MessageFactory msgFactory; + /** */ + private final MessageFormatter formatter; + /** * @param msgFactory Message factory. + * @param formatter Formatter. */ - public GridDirectParser(MessageFactory msgFactory) { + public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) { assert msgFactory != null; + assert formatter != null; this.msgFactory = msgFactory; + this.formatter = formatter; } /** {@inheritDoc} */ - @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) + throws IOException, IgniteCheckedException { MessageAdapter msg = ses.removeMeta(MSG_META_KEY); - if (msg == null && buf.hasRemaining()) - msg = msgFactory.create(buf.get()); + if (msg == null && buf.hasRemaining()) { + MessageReader reader = formatter.reader(); + + reader.setBuffer(buf); + + MessageHeader header = reader.readHeader(); + + if (reader.isLastRead()) { + msg = msgFactory.create(header.messageType()); + + msg.setReader(reader); + } + } boolean finished = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java new file mode 100644 index 0000000..99d2d57 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** + * Message header. + */ +public final class MessageHeader { + /** Message type. */ + private final byte msgType; + + /** Fields count. */ + private final byte fieldCnt; + + /** + * @param msgType Message type. + * @param fieldCnt Fields count. + */ + public MessageHeader(byte msgType, byte fieldCnt) { + this.msgType = msgType; + this.fieldCnt = fieldCnt; + } + + public byte messageType() { + return msgType; + } + + /** + * @return Fields count. + */ + public byte fieldsCount() { + return fieldCnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index 9fa122d..72d7785 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -36,6 +36,13 @@ public interface MessageReader { public void setBuffer(ByteBuffer buf); /** + * Reads message header. + * + * @return Header. + */ + public MessageHeader readHeader(); + + /** * Reads {@code byte} value. * * @param name Field name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index 564fc1e..9ad5bf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -36,6 +36,13 @@ public interface MessageWriter { public void setBuffer(ByteBuffer buf); /** + * Writes message header. + * + * @param header Header. + */ + public boolean writeHeader(MessageHeader header); + + /** * Writes {@code byte} value. * * @param name Field name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 7843ce3..959d094 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1504,6 +1504,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; + GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); + GridNioServer<MessageAdapter> srvr = GridNioServer.<MessageAdapter>builder() .address(locHost) @@ -1521,7 +1523,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .directMode(true) .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) - .filters(new GridNioCodecFilter(new GridDirectParser(messageFactory), log, true), + .filters(new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log)) .messageFormatter(messageFormatter) .build(); @@ -2450,7 +2452,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { try { - GridDirectParser parser = new GridDirectParser(getSpiContext().messageFactory()); + GridDirectParser parser = new GridDirectParser(getSpiContext().messageFactory(), + getSpiContext().messageFormatter()); IpcToNioAdapter<MessageAdapter> adapter = new IpcToNioAdapter<>( metricsLsnr, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 80b9b96..d8321b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -531,7 +531,7 @@ public class GridSpiTestContext implements IgniteSpiContext { /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new GridIoMessageFactory(messageFormatter(), null); + factory = new GridIoMessageFactory(null); return factory; }