IGNITE-61 - Direct marshalling (fixes after review)

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1023cd14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1023cd14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1023cd14

Branch: refs/heads/ignite-82
Commit: 1023cd1425aff3b91d7c74bfa6d268ee7f8c4dce
Parents: 08c10d0
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Mon Feb 9 17:59:36 2015 -0800
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Mon Feb 9 17:59:36 2015 -0800

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |  4 +-
 .../managers/communication/GridIoManager.java   | 33 ++++++--------
 .../communication/GridIoMessageFactory.java     | 12 +++---
 .../internal/util/ipc/IpcToNioAdapter.java      | 10 ++---
 .../internal/util/nio/GridDirectParser.java     | 17 +++-----
 .../ignite/internal/util/nio/GridNioServer.java | 33 ++++++--------
 .../util/nio/GridShmemCommunicationClient.java  | 10 ++---
 .../util/nio/GridTcpCommunicationClient.java    | 10 ++---
 .../communication/MessageFormatter.java         | 39 +++++++++++++++++
 .../communication/MessageReaderFactory.java     | 35 ---------------
 .../communication/MessageWriterFactory.java     | 35 ---------------
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  2 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  8 ++--
 .../communication/tcp/TcpCommunicationSpi.java  | 45 +++++++++++++++++---
 .../testframework/GridSpiTestContext.java       | 14 +++---
 ...gniteProjectionStartStopRestartSelfTest.java |  4 +-
 16 files changed, 149 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index f321386..771b352 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -527,8 +527,8 @@ public abstract class GridManagerAdapter<T extends 
IgniteSpi> implements GridMan
                         }
                     }
 
-                    @Override public MessageWriterFactory 
messageWriterFactory() {
-                        return ctx.io().messageWriterFactory();
+                    @Override public MessageFormatter messageFormatter() {
+                        return ctx.io().formatter();
                     }
 
                     @Override public MessageFactory messageFactory() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 60bc12a..e53691d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -135,7 +135,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     private MessageFactory msgFactory;
 
     /** */
-    private MessageWriterFactory writerFactory;
+    private MessageFormatter formatter;
 
     /**
      * @param ctx Grid kernal context.
@@ -163,10 +163,10 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     /**
      * @return Message writer factory.
      */
-    public MessageWriterFactory messageWriterFactory() {
-        assert writerFactory != null;
+    public MessageFormatter formatter() {
+        assert formatter != null;
 
-        return writerFactory;
+        return formatter;
     }
 
     /**
@@ -208,33 +208,28 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             }
         });
 
-        MessageWriterFactory[] writerExt = 
ctx.plugins().extensions(MessageWriterFactory.class);
+        MessageFormatter[] formatterExt = 
ctx.plugins().extensions(MessageFormatter.class);
 
-        if (writerExt != null && writerExt.length > 0)
-            writerFactory = writerExt[0];
+        if (formatterExt != null && formatterExt.length > 0) {
+            if (formatterExt.length > 1)
+                throw new IgniteCheckedException("More than one 
MessageFormatter extension is defined. Check your " +
+                    "plugins configuration and make sure that only one of them 
provides custom message format.");
+
+            formatter = formatterExt[0];
+        }
         else {
-            writerFactory = new MessageWriterFactory() {
+            formatter = new MessageFormatter() {
                 @Override public MessageWriter writer() {
                     return new DirectMessageWriter();
                 }
-            };
-        }
 
-        MessageReaderFactory readerFactory;
-
-        MessageReaderFactory[] readerExt = 
ctx.plugins().extensions(MessageReaderFactory.class);
-
-        if (readerExt != null && readerExt.length > 0)
-            readerFactory = readerExt[0];
-        else {
-            readerFactory = new MessageReaderFactory() {
                 @Override public MessageReader reader() {
                     return new DirectMessageReader(msgFactory);
                 }
             };
         }
 
-        msgFactory = new GridIoMessageFactory(readerFactory, 
ctx.plugins().extensions(MessageFactory.class));
+        msgFactory = new GridIoMessageFactory(formatter, 
ctx.plugins().extensions(MessageFactory.class));
 
         if (log.isDebugEnabled())
             log.debug(startInfo());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index c1c2c54..e8481f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -53,19 +53,19 @@ public class GridIoMessageFactory implements MessageFactory 
{
     private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = 
new ConcurrentHashMap8<>();
 
     /** Message reader factory. */
-    private final MessageReaderFactory readerFactory;
+    private final MessageFormatter formatter;
 
     /** Extensions. */
     private final MessageFactory[] ext;
 
     /**
-     * @param readerFactory Message reader factory.
+     * @param formatter Message formatter.
      * @param ext Extensions.
      */
-    public GridIoMessageFactory(MessageReaderFactory readerFactory, 
MessageFactory[] ext) {
-        assert readerFactory != null;
+    public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] 
ext) {
+        assert formatter != null;
 
-        this.readerFactory = readerFactory;
+        this.formatter = formatter;
         this.ext = ext;
     }
 
@@ -524,7 +524,7 @@ public class GridIoMessageFactory implements MessageFactory 
{
         if (msg == null)
             throw new IgniteException("Invalid message type: " + type);
 
-        msg.setReader(readerFactory.reader());
+        msg.setReader(formatter.reader());
 
         return msg;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 67c2eae..388c38f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -54,23 +54,23 @@ public class IpcToNioAdapter<T> {
     private final GridNioMetricsListener metricsLsnr;
 
     /** */
-    private final MessageWriterFactory writerFactory;
+    private final MessageFormatter formatter;
 
     /**
      * @param metricsLsnr Metrics listener.
      * @param log Log.
      * @param endp Endpoint.
      * @param lsnr Listener.
-     * @param writerFactory Message writer factory.
+     * @param formatter Message formatter.
      * @param filters Filters.
      */
     public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger 
log, IpcEndpoint endp,
-        GridNioServerListener<T> lsnr, MessageWriterFactory writerFactory, 
GridNioFilter... filters) {
+        GridNioServerListener<T> lsnr, MessageFormatter formatter, 
GridNioFilter... filters) {
         assert metricsLsnr != null;
 
         this.metricsLsnr = metricsLsnr;
         this.endp = endp;
-        this.writerFactory = writerFactory;
+        this.formatter = formatter;
 
         chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
         ses = new GridNioSessionImpl(chain, null, null, true);
@@ -152,7 +152,7 @@ public class IpcToNioAdapter<T> {
         assert writeBuf.hasArray();
 
         try {
-            msg.setWriter(writerFactory.writer());
+            msg.setWriter(formatter.writer());
 
             int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 021aa91..3b00bd9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
 import org.apache.ignite.plugin.extensions.communication.*;
-import org.apache.ignite.spi.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -33,23 +32,19 @@ public class GridDirectParser implements GridNioParser {
     private static final int MSG_META_KEY = 
GridNioSessionMetaKey.nextUniqueKey();
 
     /** */
-    private IgniteSpiAdapter spi;
-
-    /** */
-    private MessageFactory msgFactory;
+    private final MessageFactory msgFactory;
 
     /**
-     * @param spi Spi.
+     * @param msgFactory Message factory.
      */
-    public GridDirectParser(IgniteSpiAdapter spi) {
-        this.spi = spi;
+    public GridDirectParser(MessageFactory msgFactory) {
+        assert msgFactory != null;
+
+        this.msgFactory = msgFactory;
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer 
buf) throws IOException, IgniteCheckedException {
-        if (msgFactory == null)
-            msgFactory = spi.getSpiContext().messageFactory();
-
         MessageAdapter msg = ses.removeMeta(MSG_META_KEY);
 
         if (msg == null && buf.hasRemaining())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 4a2e2e4..ffa345d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
-import org.apache.ignite.spi.*;
 import org.apache.ignite.thread.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
@@ -140,10 +139,7 @@ public class GridNioServer<T> {
 
     /** */
     @GridToStringExclude
-    private IgniteSpiAdapter spi;
-
-    /** */
-    private MessageWriterFactory messageWriterFactory;
+    private MessageFormatter formatter;
 
     /** Static initializer ensures single-threaded execution of workaround. */
     static {
@@ -172,6 +168,7 @@ public class GridNioServer<T> {
      * @param directMode Whether direct mode is used.
      * @param daemon Daemon flag to create threads.
      * @param metricsLsnr Metrics listener.
+     * @param formatter Message formatter.
      * @param filters Filters for this server.
      * @throws IgniteCheckedException If failed.
      */
@@ -191,7 +188,7 @@ public class GridNioServer<T> {
         boolean directMode,
         boolean daemon,
         GridNioMetricsListener metricsLsnr,
-        IgniteSpiAdapter spi,
+        MessageFormatter formatter,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         A.notNull(addr, "addr");
@@ -256,7 +253,7 @@ public class GridNioServer<T> {
 
         this.directMode = directMode;
         this.metricsLsnr = metricsLsnr;
-        this.spi = spi;
+        this.formatter = formatter;
     }
 
     /**
@@ -1023,10 +1020,7 @@ public class GridNioServer<T> {
 
                     assert msg != null;
 
-                    if (messageWriterFactory == null)
-                        messageWriterFactory = 
spi.getSpiContext().messageWriterFactory();
-
-                    msg.setWriter(messageWriterFactory.writer());
+                    msg.setWriter(formatter.writer());
 
                     finished = msg.writeTo(buf);
                 }
@@ -1047,10 +1041,7 @@ public class GridNioServer<T> {
 
                     assert msg != null;
 
-                    if (messageWriterFactory == null)
-                        messageWriterFactory = 
spi.getSpiContext().messageWriterFactory();
-
-                    msg.setWriter(messageWriterFactory.writer());
+                    msg.setWriter(formatter.writer());
 
                     finished = msg.writeTo(buf);
                 }
@@ -2078,8 +2069,8 @@ public class GridNioServer<T> {
         /** Daemon flag. */
         private boolean daemon;
 
-        /** SPI. */
-        private IgniteSpiAdapter spi;
+        /** Message formatter. */
+        private MessageFormatter formatter;
 
         /**
          * Finishes building the instance.
@@ -2104,7 +2095,7 @@ public class GridNioServer<T> {
                 directMode,
                 daemon,
                 metricsLsnr,
-                spi,
+                formatter,
                 filters != null ? Arrays.copyOf(filters, filters.length) : 
EMPTY_FILTERS
             );
 
@@ -2299,11 +2290,11 @@ public class GridNioServer<T> {
         }
 
         /**
-         * @param spi SPI.
+         * @param formatter Message formatter.
          * @return This for chaining.
          */
-        public Builder<T> spi(IgniteSpiAdapter spi) {
-            this.spi = spi;
+        public Builder<T> messageFormatter(MessageFormatter formatter) {
+            this.formatter = formatter;
 
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index c3c9a92..2add325 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -39,18 +39,18 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
     private final ByteBuffer writeBuf;
 
     /** */
-    private final MessageWriterFactory writerFactory;
+    private final MessageFormatter formatter;
 
     /**
      * @param metricsLsnr Metrics listener.
      * @param port Shared memory IPC server port.
      * @param connTimeout Connection timeout.
      * @param log Logger.
-     * @param writerFactory Message writer factory.
+     * @param formatter Message formatter.
      * @throws IgniteCheckedException If failed.
      */
     public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, 
int port, long connTimeout,
-        IgniteLogger log, MessageWriterFactory writerFactory) throws 
IgniteCheckedException {
+        IgniteLogger log, MessageFormatter formatter) throws 
IgniteCheckedException {
         super(metricsLsnr);
 
         assert metricsLsnr != null;
@@ -63,7 +63,7 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
 
         writeBuf.order(ByteOrder.nativeOrder());
 
-        this.writerFactory = writerFactory;
+        this.formatter = formatter;
     }
 
     /** {@inheritDoc} */
@@ -116,7 +116,7 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
         assert writeBuf.hasArray();
 
         try {
-            msg.setWriter(writerFactory.writer());
+            msg.setWriter(formatter.writer());
 
             int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
index 561547d..fd2aeb9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
@@ -49,7 +49,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
     private final ByteBuffer writeBuf;
 
     /** */
-    private final MessageWriterFactory writerFactory;
+    private final MessageFormatter formatter;
 
     /**
      * @param metricsLsnr Metrics listener.
@@ -62,7 +62,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
      * @param bufSize Buffer size (or {@code 0} to disable buffer).
      * @param minBufferedMsgCnt Minimum buffered message count.
      * @param bufSizeRatio Communication buffer size ratio.
-     * @param writerFactory Message writer factory.
+     * @param formatter Message formatter.
      * @throws IgniteCheckedException If failed.
      */
     public GridTcpCommunicationClient(
@@ -76,7 +76,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
         int bufSize,
         int minBufferedMsgCnt,
         double bufSizeRatio,
-        MessageWriterFactory writerFactory
+        MessageFormatter formatter
     ) throws IgniteCheckedException {
         super(metricsLsnr);
 
@@ -93,7 +93,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
 
         this.minBufferedMsgCnt = minBufferedMsgCnt;
         this.bufSizeRatio = bufSizeRatio;
-        this.writerFactory = writerFactory;
+        this.formatter = formatter;
 
         writeBuf = ByteBuffer.allocate(8 << 10);
 
@@ -197,7 +197,7 @@ public class GridTcpCommunicationClient extends 
GridAbstractCommunicationClient
         assert writeBuf.hasArray();
 
         try {
-            msg.setWriter(writerFactory.writer());
+            msg.setWriter(formatter.writer());
 
             int cnt = U.writeMessageFully(msg, out, writeBuf);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
new file mode 100644
index 0000000..01c0c61
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import org.apache.ignite.plugin.*;
+
+/**
+ * TODO
+ */
+public interface MessageFormatter extends Extension {
+    /**
+     * Creates new message writer instance.
+     *
+     * @return Message writer.
+     */
+    public MessageWriter writer();
+
+    /**
+     * Creates new message reader instance.
+     *
+     * @return Message reader.
+     */
+    public MessageReader reader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java
deleted file mode 100644
index 4729551..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import org.apache.ignite.plugin.*;
-
-/**
- * Factory for message readers.
- * <p>
- * A plugin can provide his own message reader factory as
- * an extension to define a custom binary format.
- */
-public interface MessageReaderFactory extends Extension {
-    /**
-     * Creates new message reader instance.
-     *
-     * @return Message reader.
-     */
-    public MessageReader reader();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java
deleted file mode 100644
index 1ab04e3..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import org.apache.ignite.plugin.*;
-
-/**
- * Factory for message writers.
- * <p>
- * A plugin can provide his own message writer factory as
- * an extension to define a custom binary format.
- */
-public interface MessageWriterFactory extends Extension {
-    /**
-     * Creates new message writer instance.
-     *
-     * @return Message writer.
-     */
-    public MessageWriter writer();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 7a731e5..c4518e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -713,7 +713,7 @@ public abstract class IgniteSpiAdapter implements 
IgniteSpi, IgniteSpiManagement
             return null;
         }
 
-        @Override public MessageWriterFactory messageWriterFactory() {
+        @Override public MessageFormatter messageFormatter() {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 7f7e25e..5a0a23f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -377,13 +377,15 @@ public interface IgniteSpiContext {
         @Nullable ClassLoader ldr) throws IgniteException;
 
     /**
-     * Gets message writer factory.
+     * Gets message formatter.
      *
-     * @return Message writer factory.
+     * @return Message formatter.
      */
-    public MessageWriterFactory messageWriterFactory();
+    public MessageFormatter messageFormatter();
 
     /**
+     * Gets message factory.
+     *
      * @return Message factory.
      */
     public MessageFactory messageFactory();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e3db36b..2ab05cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1470,6 +1470,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         // If configured TCP port is busy, find first available in range.
         for (int port = locPort; port < locPort + locPortRange; port++) {
             try {
+                MessageFactory messageFactory = new MessageFactory() {
+                    private MessageFactory impl;
+
+                    @Nullable @Override public MessageAdapter create(byte 
type) {
+                        if (impl == null)
+                            impl = getSpiContext().messageFactory();
+
+                        assert impl != null;
+
+                        return impl.create(type);
+                    }
+                };
+
+                MessageFormatter messageFormatter = new MessageFormatter() {
+                    private MessageFormatter impl;
+
+                    @Override public MessageWriter writer() {
+                        if (impl == null)
+                            impl = getSpiContext().messageFormatter();
+
+                        assert impl != null;
+
+                        return impl.writer();
+                    }
+
+                    @Override public MessageReader reader() {
+                        if (impl == null)
+                            impl = getSpiContext().messageFormatter();
+
+                        assert impl != null;
+
+                        return impl.reader();
+                    }
+                };
+
                 GridNioServer<MessageAdapter> srvr =
                     GridNioServer.<MessageAdapter>builder()
                         .address(locHost)
@@ -1487,9 +1522,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
-                        .filters(new GridNioCodecFilter(new 
GridDirectParser(this), log, true),
+                        .filters(new GridNioCodecFilter(new 
GridDirectParser(messageFactory), log, true),
                             new GridConnectionBytesVerifyFilter(log))
-                        .spi(this)
+                        .messageFormatter(messageFormatter)
                         .build();
 
                 boundTcpPort = port;
@@ -1852,7 +1887,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             try {
                 client = new GridShmemCommunicationClient(metricsLsnr, port, 
connTimeout, log,
-                    getSpiContext().messageWriterFactory());
+                    getSpiContext().messageFormatter());
             }
             catch (IgniteCheckedException e) {
                 // Reconnect for the second time, if connection is not 
established.
@@ -2421,8 +2456,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     log,
                     endpoint,
                     srvLsnr,
-                    getSpiContext().messageWriterFactory(),
-                    new GridNioCodecFilter(new 
GridDirectParser(TcpCommunicationSpi.this), log, true),
+                    getSpiContext().messageFormatter(),
+                    new GridNioCodecFilter(new 
GridDirectParser(getSpiContext().messageFactory()), log, true),
                     new GridConnectionBytesVerifyFilter(log)
                 );
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index d91ed13..97cb488 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -505,21 +505,21 @@ public class GridSpiTestContext implements 
IgniteSpiContext {
         return null;
     }
 
-    @Override public MessageWriterFactory messageWriterFactory() {
-        return new MessageWriterFactory() {
+    @Override public MessageFormatter messageFormatter() {
+        return new MessageFormatter() {
             @Override public MessageWriter writer() {
                 return new DirectMessageWriter();
             }
+
+            @Override public MessageReader reader() {
+                throw new UnsupportedOperationException();
+            }
         };
     }
 
     /** {@inheritDoc} */
     @Override public MessageFactory messageFactory() {
-        return new GridIoMessageFactory(new MessageReaderFactory() {
-            @Override public MessageReader reader() {
-                return new DirectMessageReader(null);
-            }
-        }, null);
+        return new GridIoMessageFactory(messageFormatter(), null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
 
b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
index babdbbb..f5a73d8 100644
--- 
a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
+++ 
b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
@@ -49,10 +49,10 @@ import static 
org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*;
 @SuppressWarnings("ConstantConditions")
 public class IgniteProjectionStartStopRestartSelfTest extends 
GridCommonAbstractTest {
     /** */
-    private static final String SSH_UNAME = System.getenv("test.ssh.username");
+    private static final String SSH_UNAME = "vkulichenko";
 
     /** */
-    private static final String SSH_PWD = System.getenv("test.ssh.password");
+    private static final String SSH_PWD = "8tQHsaM";
 
     /** */
     private static final String SSH_KEY = System.getenv("ssh.key");

Reply via email to