Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-312 700067689 -> a31638115


# gg-9791 - Added MessageHeader


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0303b402
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0303b402
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0303b402

Branch: refs/heads/ignite-312
Commit: 0303b402aa37f3156c3f624c5c95b85b12a9be9f
Parents: 3021ef8
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Tue Feb 17 18:23:59 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Tue Feb 17 18:23:59 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectMessageReader.java    |  9 ++++
 .../internal/direct/DirectMessageWriter.java    |  7 +++
 .../managers/communication/GridIoManager.java   |  2 +-
 .../communication/GridIoMessageFactory.java     | 11 +----
 .../internal/util/nio/GridDirectParser.java     | 26 +++++++++--
 .../extensions/communication/MessageHeader.java | 49 ++++++++++++++++++++
 .../extensions/communication/MessageReader.java |  7 +++
 .../extensions/communication/MessageWriter.java |  7 +++
 .../communication/tcp/TcpCommunicationSpi.java  |  7 ++-
 .../testframework/GridSpiTestContext.java       |  2 +-
 10 files changed, 109 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index f52a4c8..b616ca8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -47,6 +47,15 @@ public class DirectMessageReader implements MessageReader {
     }
 
     /** {@inheritDoc} */
+    @Override public MessageHeader readHeader() {
+        byte type = stream.readByte();
+
+        lastRead = stream.lastFinished();
+
+        return lastRead ? new MessageHeader(type, (byte)0) : null;
+    }
+
+    /** {@inheritDoc} */
     @Override public byte readByte(String name) {
         byte val = stream.readByte();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 238ecb6..d5ed8f9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -40,6 +40,13 @@ public class DirectMessageWriter implements MessageWriter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean writeHeader(MessageHeader header) {
+        stream.writeByte(header.messageType());
+
+        return stream.lastFinished();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeByte(String name, byte val) {
         stream.writeByte(val);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index e201293..7f2fdb8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -229,7 +229,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             };
         }
 
-        msgFactory = new GridIoMessageFactory(formatter, 
ctx.plugins().extensions(MessageFactory.class));
+        msgFactory = new 
GridIoMessageFactory(ctx.plugins().extensions(MessageFactory.class));
 
         if (log.isDebugEnabled())
             log.debug(startInfo());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e070571..ec74b7f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -52,20 +52,13 @@ public class GridIoMessageFactory implements MessageFactory 
{
     /** Custom messages registry. Used for test purposes. */
     private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = 
new ConcurrentHashMap8<>();
 
-    /** Message reader factory. */
-    private final MessageFormatter formatter;
-
     /** Extensions. */
     private final MessageFactory[] ext;
 
     /**
-     * @param formatter Message formatter.
      * @param ext Extensions.
      */
-    public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] 
ext) {
-        assert formatter != null;
-
-        this.formatter = formatter;
+    public GridIoMessageFactory(MessageFactory[] ext) {
         this.ext = ext;
     }
 
@@ -514,8 +507,6 @@ public class GridIoMessageFactory implements MessageFactory 
{
         if (msg == null)
             throw new IgniteException("Invalid message type: " + type);
 
-        msg.setReader(formatter.reader());
-
         return msg;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 3b00bd9..5e21dd3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -34,21 +34,39 @@ public class GridDirectParser implements GridNioParser {
     /** */
     private final MessageFactory msgFactory;
 
+    /** */
+    private final MessageFormatter formatter;
+
     /**
      * @param msgFactory Message factory.
+     * @param formatter Formatter.
      */
-    public GridDirectParser(MessageFactory msgFactory) {
+    public GridDirectParser(MessageFactory msgFactory, MessageFormatter 
formatter) {
         assert msgFactory != null;
+        assert formatter != null;
 
         this.msgFactory = msgFactory;
+        this.formatter = formatter;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer 
buf) throws IOException, IgniteCheckedException {
+    @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer 
buf)
+        throws IOException, IgniteCheckedException {
         MessageAdapter msg = ses.removeMeta(MSG_META_KEY);
 
-        if (msg == null && buf.hasRemaining())
-            msg = msgFactory.create(buf.get());
+        if (msg == null && buf.hasRemaining()) {
+            MessageReader reader = formatter.reader();
+
+            reader.setBuffer(buf);
+
+            MessageHeader header = reader.readHeader();
+
+            if (reader.isLastRead()) {
+                msg = msgFactory.create(header.messageType());
+
+                msg.setReader(reader);
+            }
+        }
 
         boolean finished = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java
new file mode 100644
index 0000000..99d2d57
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageHeader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+/**
+ * Message header.
+ */
+public final class MessageHeader {
+    /** Message type. */
+    private final byte msgType;
+
+    /** Fields count. */
+    private final byte fieldCnt;
+
+    /**
+     * @param msgType Message type.
+     * @param fieldCnt Fields count.
+     */
+    public MessageHeader(byte msgType, byte fieldCnt) {
+        this.msgType = msgType;
+        this.fieldCnt = fieldCnt;
+    }
+
+    public byte messageType() {
+        return msgType;
+    }
+
+    /**
+     * @return Fields count.
+     */
+    public byte fieldsCount() {
+        return fieldCnt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index 9fa122d..72d7785 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -36,6 +36,13 @@ public interface MessageReader {
     public void setBuffer(ByteBuffer buf);
 
     /**
+     * Reads message header.
+     *
+     * @return Header.
+     */
+    public MessageHeader readHeader();
+
+    /**
      * Reads {@code byte} value.
      *
      * @param name Field name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 564fc1e..9ad5bf2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -36,6 +36,13 @@ public interface MessageWriter {
     public void setBuffer(ByteBuffer buf);
 
     /**
+     * Writes message header.
+     *
+     * @param header Header.
+     */
+    public boolean writeHeader(MessageHeader header);
+
+    /**
      * Writes {@code byte} value.
      *
      * @param name Field name.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7843ce3..959d094 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1504,6 +1504,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
+                GridDirectParser parser = new GridDirectParser(messageFactory, 
messageFormatter);
+
                 GridNioServer<MessageAdapter> srvr =
                     GridNioServer.<MessageAdapter>builder()
                         .address(locHost)
@@ -1521,7 +1523,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
-                        .filters(new GridNioCodecFilter(new 
GridDirectParser(messageFactory), log, true),
+                        .filters(new GridNioCodecFilter(parser, log, true),
                             new GridConnectionBytesVerifyFilter(log))
                         .messageFormatter(messageFormatter)
                         .build();
@@ -2450,7 +2452,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             try {
-                GridDirectParser parser = new 
GridDirectParser(getSpiContext().messageFactory());
+                GridDirectParser parser = new 
GridDirectParser(getSpiContext().messageFactory(),
+                    getSpiContext().messageFormatter());
 
                 IpcToNioAdapter<MessageAdapter> adapter = new 
IpcToNioAdapter<>(
                     metricsLsnr,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0303b402/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 80b9b96..d8321b3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -531,7 +531,7 @@ public class GridSpiTestContext implements IgniteSpiContext 
{
     /** {@inheritDoc} */
     @Override public MessageFactory messageFactory() {
         if (factory == null)
-            factory = new GridIoMessageFactory(messageFormatter(), null);
+            factory = new GridIoMessageFactory(null);
 
         return factory;
     }

Reply via email to