IGNITE-61 - Direct marshalling (fixes after review)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1023cd14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1023cd14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1023cd14 Branch: refs/heads/ignite-82 Commit: 1023cd1425aff3b91d7c74bfa6d268ee7f8c4dce Parents: 08c10d0 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Feb 9 17:59:36 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Feb 9 17:59:36 2015 -0800 ---------------------------------------------------------------------- .../internal/managers/GridManagerAdapter.java | 4 +- .../managers/communication/GridIoManager.java | 33 ++++++-------- .../communication/GridIoMessageFactory.java | 12 +++--- .../internal/util/ipc/IpcToNioAdapter.java | 10 ++--- .../internal/util/nio/GridDirectParser.java | 17 +++----- .../ignite/internal/util/nio/GridNioServer.java | 33 ++++++-------- .../util/nio/GridShmemCommunicationClient.java | 10 ++--- .../util/nio/GridTcpCommunicationClient.java | 10 ++--- .../communication/MessageFormatter.java | 39 +++++++++++++++++ .../communication/MessageReaderFactory.java | 35 --------------- .../communication/MessageWriterFactory.java | 35 --------------- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 2 +- .../org/apache/ignite/spi/IgniteSpiContext.java | 8 ++-- .../communication/tcp/TcpCommunicationSpi.java | 45 +++++++++++++++++--- .../testframework/GridSpiTestContext.java | 14 +++--- ...gniteProjectionStartStopRestartSelfTest.java | 4 +- 16 files changed, 149 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index f321386..771b352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -527,8 +527,8 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } } - @Override public MessageWriterFactory messageWriterFactory() { - return ctx.io().messageWriterFactory(); + @Override public MessageFormatter messageFormatter() { + return ctx.io().formatter(); } @Override public MessageFactory messageFactory() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 60bc12a..e53691d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -135,7 +135,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private MessageFactory msgFactory; /** */ - private MessageWriterFactory writerFactory; + private MessageFormatter formatter; /** * @param ctx Grid kernal context. @@ -163,10 +163,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * @return Message writer factory. */ - public MessageWriterFactory messageWriterFactory() { - assert writerFactory != null; + public MessageFormatter formatter() { + assert formatter != null; - return writerFactory; + return formatter; } /** @@ -208,33 +208,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } }); - MessageWriterFactory[] writerExt = ctx.plugins().extensions(MessageWriterFactory.class); + MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class); - if (writerExt != null && writerExt.length > 0) - writerFactory = writerExt[0]; + if (formatterExt != null && formatterExt.length > 0) { + if (formatterExt.length > 1) + throw new IgniteCheckedException("More than one MessageFormatter extension is defined. Check your " + + "plugins configuration and make sure that only one of them provides custom message format."); + + formatter = formatterExt[0]; + } else { - writerFactory = new MessageWriterFactory() { + formatter = new MessageFormatter() { @Override public MessageWriter writer() { return new DirectMessageWriter(); } - }; - } - MessageReaderFactory readerFactory; - - MessageReaderFactory[] readerExt = ctx.plugins().extensions(MessageReaderFactory.class); - - if (readerExt != null && readerExt.length > 0) - readerFactory = readerExt[0]; - else { - readerFactory = new MessageReaderFactory() { @Override public MessageReader reader() { return new DirectMessageReader(msgFactory); } }; } - msgFactory = new GridIoMessageFactory(readerFactory, ctx.plugins().extensions(MessageFactory.class)); + msgFactory = new GridIoMessageFactory(formatter, ctx.plugins().extensions(MessageFactory.class)); if (log.isDebugEnabled()) log.debug(startInfo()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index c1c2c54..e8481f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -53,19 +53,19 @@ public class GridIoMessageFactory implements MessageFactory { private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = new ConcurrentHashMap8<>(); /** Message reader factory. */ - private final MessageReaderFactory readerFactory; + private final MessageFormatter formatter; /** Extensions. */ private final MessageFactory[] ext; /** - * @param readerFactory Message reader factory. + * @param formatter Message formatter. * @param ext Extensions. */ - public GridIoMessageFactory(MessageReaderFactory readerFactory, MessageFactory[] ext) { - assert readerFactory != null; + public GridIoMessageFactory(MessageFormatter formatter, MessageFactory[] ext) { + assert formatter != null; - this.readerFactory = readerFactory; + this.formatter = formatter; this.ext = ext; } @@ -524,7 +524,7 @@ public class GridIoMessageFactory implements MessageFactory { if (msg == null) throw new IgniteException("Invalid message type: " + type); - msg.setReader(readerFactory.reader()); + msg.setReader(formatter.reader()); return msg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index 67c2eae..388c38f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -54,23 +54,23 @@ public class IpcToNioAdapter<T> { private final GridNioMetricsListener metricsLsnr; /** */ - private final MessageWriterFactory writerFactory; + private final MessageFormatter formatter; /** * @param metricsLsnr Metrics listener. * @param log Log. * @param endp Endpoint. * @param lsnr Listener. - * @param writerFactory Message writer factory. + * @param formatter Message formatter. * @param filters Filters. */ public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp, - GridNioServerListener<T> lsnr, MessageWriterFactory writerFactory, GridNioFilter... filters) { + GridNioServerListener<T> lsnr, MessageFormatter formatter, GridNioFilter... filters) { assert metricsLsnr != null; this.metricsLsnr = metricsLsnr; this.endp = endp; - this.writerFactory = writerFactory; + this.formatter = formatter; chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters); ses = new GridNioSessionImpl(chain, null, null, true); @@ -152,7 +152,7 @@ public class IpcToNioAdapter<T> { assert writeBuf.hasArray(); try { - msg.setWriter(writerFactory.writer()); + msg.setWriter(formatter.writer()); int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 021aa91..3b00bd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; import java.io.*; @@ -33,23 +32,19 @@ public class GridDirectParser implements GridNioParser { private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); /** */ - private IgniteSpiAdapter spi; - - /** */ - private MessageFactory msgFactory; + private final MessageFactory msgFactory; /** - * @param spi Spi. + * @param msgFactory Message factory. */ - public GridDirectParser(IgniteSpiAdapter spi) { - this.spi = spi; + public GridDirectParser(MessageFactory msgFactory) { + assert msgFactory != null; + + this.msgFactory = msgFactory; } /** {@inheritDoc} */ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - if (msgFactory == null) - msgFactory = spi.getSpiContext().messageFactory(); - MessageAdapter msg = ses.removeMeta(MSG_META_KEY); if (msg == null && buf.hasRemaining()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 4a2e2e4..ffa345d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.spi.*; import org.apache.ignite.thread.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -140,10 +139,7 @@ public class GridNioServer<T> { /** */ @GridToStringExclude - private IgniteSpiAdapter spi; - - /** */ - private MessageWriterFactory messageWriterFactory; + private MessageFormatter formatter; /** Static initializer ensures single-threaded execution of workaround. */ static { @@ -172,6 +168,7 @@ public class GridNioServer<T> { * @param directMode Whether direct mode is used. * @param daemon Daemon flag to create threads. * @param metricsLsnr Metrics listener. + * @param formatter Message formatter. * @param filters Filters for this server. * @throws IgniteCheckedException If failed. */ @@ -191,7 +188,7 @@ public class GridNioServer<T> { boolean directMode, boolean daemon, GridNioMetricsListener metricsLsnr, - IgniteSpiAdapter spi, + MessageFormatter formatter, GridNioFilter... filters ) throws IgniteCheckedException { A.notNull(addr, "addr"); @@ -256,7 +253,7 @@ public class GridNioServer<T> { this.directMode = directMode; this.metricsLsnr = metricsLsnr; - this.spi = spi; + this.formatter = formatter; } /** @@ -1023,10 +1020,7 @@ public class GridNioServer<T> { assert msg != null; - if (messageWriterFactory == null) - messageWriterFactory = spi.getSpiContext().messageWriterFactory(); - - msg.setWriter(messageWriterFactory.writer()); + msg.setWriter(formatter.writer()); finished = msg.writeTo(buf); } @@ -1047,10 +1041,7 @@ public class GridNioServer<T> { assert msg != null; - if (messageWriterFactory == null) - messageWriterFactory = spi.getSpiContext().messageWriterFactory(); - - msg.setWriter(messageWriterFactory.writer()); + msg.setWriter(formatter.writer()); finished = msg.writeTo(buf); } @@ -2078,8 +2069,8 @@ public class GridNioServer<T> { /** Daemon flag. */ private boolean daemon; - /** SPI. */ - private IgniteSpiAdapter spi; + /** Message formatter. */ + private MessageFormatter formatter; /** * Finishes building the instance. @@ -2104,7 +2095,7 @@ public class GridNioServer<T> { directMode, daemon, metricsLsnr, - spi, + formatter, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS ); @@ -2299,11 +2290,11 @@ public class GridNioServer<T> { } /** - * @param spi SPI. + * @param formatter Message formatter. * @return This for chaining. */ - public Builder<T> spi(IgniteSpiAdapter spi) { - this.spi = spi; + public Builder<T> messageFormatter(MessageFormatter formatter) { + this.formatter = formatter; return this; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index c3c9a92..2add325 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -39,18 +39,18 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien private final ByteBuffer writeBuf; /** */ - private final MessageWriterFactory writerFactory; + private final MessageFormatter formatter; /** * @param metricsLsnr Metrics listener. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. * @param log Logger. - * @param writerFactory Message writer factory. + * @param formatter Message formatter. * @throws IgniteCheckedException If failed. */ public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, int port, long connTimeout, - IgniteLogger log, MessageWriterFactory writerFactory) throws IgniteCheckedException { + IgniteLogger log, MessageFormatter formatter) throws IgniteCheckedException { super(metricsLsnr); assert metricsLsnr != null; @@ -63,7 +63,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien writeBuf.order(ByteOrder.nativeOrder()); - this.writerFactory = writerFactory; + this.formatter = formatter; } /** {@inheritDoc} */ @@ -116,7 +116,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien assert writeBuf.hasArray(); try { - msg.setWriter(writerFactory.writer()); + msg.setWriter(formatter.writer()); int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java index 561547d..fd2aeb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java @@ -49,7 +49,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient private final ByteBuffer writeBuf; /** */ - private final MessageWriterFactory writerFactory; + private final MessageFormatter formatter; /** * @param metricsLsnr Metrics listener. @@ -62,7 +62,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient * @param bufSize Buffer size (or {@code 0} to disable buffer). * @param minBufferedMsgCnt Minimum buffered message count. * @param bufSizeRatio Communication buffer size ratio. - * @param writerFactory Message writer factory. + * @param formatter Message formatter. * @throws IgniteCheckedException If failed. */ public GridTcpCommunicationClient( @@ -76,7 +76,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient int bufSize, int minBufferedMsgCnt, double bufSizeRatio, - MessageWriterFactory writerFactory + MessageFormatter formatter ) throws IgniteCheckedException { super(metricsLsnr); @@ -93,7 +93,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient this.minBufferedMsgCnt = minBufferedMsgCnt; this.bufSizeRatio = bufSizeRatio; - this.writerFactory = writerFactory; + this.formatter = formatter; writeBuf = ByteBuffer.allocate(8 << 10); @@ -197,7 +197,7 @@ public class GridTcpCommunicationClient extends GridAbstractCommunicationClient assert writeBuf.hasArray(); try { - msg.setWriter(writerFactory.writer()); + msg.setWriter(formatter.writer()); int cnt = U.writeMessageFully(msg, out, writeBuf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java new file mode 100644 index 0000000..01c0c61 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +import org.apache.ignite.plugin.*; + +/** + * TODO + */ +public interface MessageFormatter extends Extension { + /** + * Creates new message writer instance. + * + * @return Message writer. + */ + public MessageWriter writer(); + + /** + * Creates new message reader instance. + * + * @return Message reader. + */ + public MessageReader reader(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java deleted file mode 100644 index 4729551..0000000 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReaderFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.plugin.extensions.communication; - -import org.apache.ignite.plugin.*; - -/** - * Factory for message readers. - * <p> - * A plugin can provide his own message reader factory as - * an extension to define a custom binary format. - */ -public interface MessageReaderFactory extends Extension { - /** - * Creates new message reader instance. - * - * @return Message reader. - */ - public MessageReader reader(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java deleted file mode 100644 index 1ab04e3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriterFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.plugin.extensions.communication; - -import org.apache.ignite.plugin.*; - -/** - * Factory for message writers. - * <p> - * A plugin can provide his own message writer factory as - * an extension to define a custom binary format. - */ -public interface MessageWriterFactory extends Extension { - /** - * Creates new message writer instance. - * - * @return Message writer. - */ - public MessageWriter writer(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 7a731e5..c4518e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -713,7 +713,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement return null; } - @Override public MessageWriterFactory messageWriterFactory() { + @Override public MessageFormatter messageFormatter() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 7f7e25e..5a0a23f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -377,13 +377,15 @@ public interface IgniteSpiContext { @Nullable ClassLoader ldr) throws IgniteException; /** - * Gets message writer factory. + * Gets message formatter. * - * @return Message writer factory. + * @return Message formatter. */ - public MessageWriterFactory messageWriterFactory(); + public MessageFormatter messageFormatter(); /** + * Gets message factory. + * * @return Message factory. */ public MessageFactory messageFactory(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e3db36b..2ab05cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1470,6 +1470,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // If configured TCP port is busy, find first available in range. for (int port = locPort; port < locPort + locPortRange; port++) { try { + MessageFactory messageFactory = new MessageFactory() { + private MessageFactory impl; + + @Nullable @Override public MessageAdapter create(byte type) { + if (impl == null) + impl = getSpiContext().messageFactory(); + + assert impl != null; + + return impl.create(type); + } + }; + + MessageFormatter messageFormatter = new MessageFormatter() { + private MessageFormatter impl; + + @Override public MessageWriter writer() { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.writer(); + } + + @Override public MessageReader reader() { + if (impl == null) + impl = getSpiContext().messageFormatter(); + + assert impl != null; + + return impl.reader(); + } + }; + GridNioServer<MessageAdapter> srvr = GridNioServer.<MessageAdapter>builder() .address(locHost) @@ -1487,9 +1522,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .directMode(true) .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) - .filters(new GridNioCodecFilter(new GridDirectParser(this), log, true), + .filters(new GridNioCodecFilter(new GridDirectParser(messageFactory), log, true), new GridConnectionBytesVerifyFilter(log)) - .spi(this) + .messageFormatter(messageFormatter) .build(); boundTcpPort = port; @@ -1852,7 +1887,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { client = new GridShmemCommunicationClient(metricsLsnr, port, connTimeout, log, - getSpiContext().messageWriterFactory()); + getSpiContext().messageFormatter()); } catch (IgniteCheckedException e) { // Reconnect for the second time, if connection is not established. @@ -2421,8 +2456,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log, endpoint, srvLsnr, - getSpiContext().messageWriterFactory(), - new GridNioCodecFilter(new GridDirectParser(TcpCommunicationSpi.this), log, true), + getSpiContext().messageFormatter(), + new GridNioCodecFilter(new GridDirectParser(getSpiContext().messageFactory()), log, true), new GridConnectionBytesVerifyFilter(log) ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index d91ed13..97cb488 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -505,21 +505,21 @@ public class GridSpiTestContext implements IgniteSpiContext { return null; } - @Override public MessageWriterFactory messageWriterFactory() { - return new MessageWriterFactory() { + @Override public MessageFormatter messageFormatter() { + return new MessageFormatter() { @Override public MessageWriter writer() { return new DirectMessageWriter(); } + + @Override public MessageReader reader() { + throw new UnsupportedOperationException(); + } }; } /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { - return new GridIoMessageFactory(new MessageReaderFactory() { - @Override public MessageReader reader() { - return new DirectMessageReader(null); - } - }, null); + return new GridIoMessageFactory(messageFormatter(), null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1023cd14/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java index babdbbb..f5a73d8 100644 --- a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java +++ b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java @@ -49,10 +49,10 @@ import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*; @SuppressWarnings("ConstantConditions") public class IgniteProjectionStartStopRestartSelfTest extends GridCommonAbstractTest { /** */ - private static final String SSH_UNAME = System.getenv("test.ssh.username"); + private static final String SSH_UNAME = "vkulichenko"; /** */ - private static final String SSH_PWD = System.getenv("test.ssh.password"); + private static final String SSH_PWD = "8tQHsaM"; /** */ private static final String SSH_KEY = System.getenv("ssh.key");