IGNITE-61 - Direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1c19aa4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1c19aa4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1c19aa4a Branch: refs/heads/ignite-82 Commit: 1c19aa4a1448ccf54241ca6ecacc082d2d42af32 Parents: f62ee2d Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 8 17:56:25 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 8 17:56:25 2015 -0800 ---------------------------------------------------------------------- .../ignite/internal/GridPluginContext.java | 6 - .../apache/ignite/internal/IgniteKernal.java | 2 - .../internal/direct/DirectByteBufferStream.java | 4 +- .../internal/direct/DirectMessageReader.java | 2 +- .../GridTcpCommunicationMessageFactory.java | 419 -------------- .../GridTcpCommunicationMessageProducer.java | 33 -- .../internal/direct/GridTcpMessageFactory.java | 31 - .../internal/managers/GridManagerAdapter.java | 3 +- .../managers/communication/GridIoManager.java | 79 +-- .../communication/GridIoMessageFactory.java | 570 +++++++++++++++++++ .../internal/util/nio/GridDirectParser.java | 3 +- .../internal/util/nio/GridNioMessageReader.java | 3 +- .../ignite/internal/util/nio/GridNioServer.java | 2 +- .../org/apache/ignite/plugin/PluginContext.java | 7 - .../communication/MessageFactory.java | 31 + .../org/apache/ignite/spi/IgniteSpiAdapter.java | 3 +- .../org/apache/ignite/spi/IgniteSpiContext.java | 3 +- .../GridCommunicationSendMessageSelfTest.java | 7 +- .../GridAbstractCommunicationSelfTest.java | 8 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 10 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 8 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 10 +- ...GridTcpCommunicationSpiRecoverySelfTest.java | 9 +- .../testframework/GridSpiTestContext.java | 14 +- 24 files changed, 648 insertions(+), 619 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java index 24e080e..31a4246 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginContext.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.plugin.*; import org.apache.ignite.spi.*; @@ -93,9 +92,4 @@ public class GridPluginContext implements PluginContext { @Override public void deregisterPorts(Class<?> cls) { ctx.ports().deregisterPorts(cls); } - - /** {@inheritDoc} */ - @Override public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer) { - return ctx.io().registerMessageProducer(producer); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 0330322..8bc846a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -768,8 +768,6 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit provider.start(ctx.plugins().pluginContextForProvider(provider), attrs); } - ctx.io().initMessageFactory(); - if (ctx.isEnterprise()) { security = new GridSecurityImpl(ctx); portables = new GridPortablesImpl(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index e35ab69..1b8b5ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -259,7 +259,7 @@ public class DirectByteBufferStream { } /** */ - private final GridTcpMessageFactory msgFactory; + private final MessageFactory msgFactory; /** */ private ByteBuffer buf; @@ -318,7 +318,7 @@ public class DirectByteBufferStream { /** * @param msgFactory Message factory. */ - public DirectByteBufferStream(@Nullable GridTcpMessageFactory msgFactory) { + public DirectByteBufferStream(@Nullable MessageFactory msgFactory) { this.msgFactory = msgFactory; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 71b9e63..15f113b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -37,7 +37,7 @@ public class DirectMessageReader implements MessageReader { /** * @param msgFactory Message factory. */ - public DirectMessageReader(GridTcpMessageFactory msgFactory) { + public DirectMessageReader(MessageFactory msgFactory) { this.stream = new DirectByteBufferStream(msgFactory); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java deleted file mode 100644 index 600bd31..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageFactory.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.direct; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.checkpoint.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.processors.clock.*; -import org.apache.ignite.internal.processors.continuous.*; -import org.apache.ignite.internal.processors.dataload.*; -import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.processors.rest.client.message.*; -import org.apache.ignite.internal.processors.rest.handlers.task.*; -import org.apache.ignite.internal.processors.rest.protocols.tcp.*; -import org.apache.ignite.internal.processors.streamer.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.spi.collision.jobstealing.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.jdk8.backport.*; - -import java.util.*; - -/** - * Communication message factory. - */ -public class GridTcpCommunicationMessageFactory { - /** Common message producers. */ - private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[89]; - - /** - * Custom messages registry. Used for test purposes. - */ - private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>(); - - /** */ - public static final int MAX_COMMON_TYPE = 88; - - static { - registerCommon(new GridTcpCommunicationMessageProducer() { - @Override public MessageAdapter create(byte type) { - switch (type) { - case 0: - return new GridJobCancelRequest(); - - case 1: - return new GridJobExecuteRequest(); - - case 2: - return new GridJobExecuteResponse(); - - case 3: - return new GridJobSiblingsRequest(); - - case 4: - return new GridJobSiblingsResponse(); - - case 5: - return new GridTaskCancelRequest(); - - case 6: - return new GridTaskSessionRequest(); - - case 7: - return new GridCheckpointRequest(); - - case 8: - return new GridIoMessage(); - - case 9: - return new GridIoUserMessage(); - - case 10: - return new GridDeploymentInfoBean(); - - case 11: - return new GridDeploymentRequest(); - - case 12: - return new GridDeploymentResponse(); - - case 13: - return new GridEventStorageMessage(); - - case 14: - return new GridCacheEvictionRequest(); - - case 15: - return new GridCacheEvictionResponse(); - - case 16: - return new GridCacheOptimisticCheckPreparedTxRequest(); - - case 17: - return new GridCacheOptimisticCheckPreparedTxResponse(); - - case 18: - return new GridCachePessimisticCheckCommittedTxRequest(); - - case 19: - return new GridCachePessimisticCheckCommittedTxResponse(); - - case 20: - return new GridCacheTtlUpdateRequest(); - - case 21: - return new GridDistributedLockRequest(); - - case 22: - return new GridDistributedLockResponse(); - - case 23: - return new GridDistributedTxFinishRequest(); - - case 24: - return new GridDistributedTxFinishResponse(); - - case 25: - return new GridDistributedTxPrepareRequest(); - - case 26: - return new GridDistributedTxPrepareResponse(); - - case 27: - return new GridDistributedUnlockRequest(); - - case 28: - return new GridDhtAffinityAssignmentRequest(); - - case 29: - return new GridDhtAffinityAssignmentResponse(); - - case 30: - return new GridDhtLockRequest(); - - case 31: - return new GridDhtLockResponse(); - - case 32: - return new GridDhtTxFinishRequest(); - - case 33: - return new GridDhtTxFinishResponse(); - - case 34: - return new GridDhtTxPrepareRequest(); - - case 35: - return new GridDhtTxPrepareResponse(); - - case 36: - return new GridDhtUnlockRequest(); - - case 37: - return new GridDhtAtomicDeferredUpdateResponse(); - - case 38: - return new GridDhtAtomicUpdateRequest(); - - case 39: - return new GridDhtAtomicUpdateResponse(); - - case 40: - return new GridNearAtomicUpdateRequest(); - - case 41: - return new GridNearAtomicUpdateResponse(); - - case 42: - return new GridDhtForceKeysRequest(); - - case 43: - return new GridDhtForceKeysResponse(); - - case 44: - return new GridDhtPartitionDemandMessage(); - - case 45: - return new GridDhtPartitionSupplyMessage(); - - case 46: - return new GridDhtPartitionsFullMessage(); - - case 47: - return new GridDhtPartitionsSingleMessage(); - - case 48: - return new GridDhtPartitionsSingleRequest(); - - case 49: - return new GridNearGetRequest(); - - case 50: - return new GridNearGetResponse(); - - case 51: - return new GridNearLockRequest(); - - case 52: - return new GridNearLockResponse(); - - case 53: - return new GridNearTxFinishRequest(); - - case 54: - return new GridNearTxFinishResponse(); - - case 55: - return new GridNearTxPrepareRequest(); - - case 56: - return new GridNearTxPrepareResponse(); - - case 57: - return new GridNearUnlockRequest(); - - case 58: - return new GridCacheQueryRequest(); - - case 59: - return new GridCacheQueryResponse(); - - case 60: - return new GridClockDeltaSnapshotMessage(); - - case 61: - return new GridContinuousMessage(); - - case 62: - return new GridDataLoadRequest(); - - case 63: - return new GridDataLoadResponse(); - - case 64: - return new GridGgfsAckMessage(); - - case 65: - return new GridGgfsBlockKey(); - - case 66: - return new GridGgfsBlocksMessage(); - - case 67: - return new GridGgfsDeleteMessage(); - - case 68: - return new GridGgfsFileAffinityRange(); - - case 69: - return new GridGgfsFragmentizerRequest(); - - case 70: - return new GridGgfsFragmentizerResponse(); - - case 71: - return new GridGgfsSyncMessage(); - - case 72: - return new GridClientHandshakeRequestWrapper(); - - case 73: - return new GridClientHandshakeResponseWrapper(); - - case 74: - return new GridClientMessageWrapper(); - - case 75: - return new GridClientPingPacketWrapper(); - - case 76: - return new GridTaskResultRequest(); - - case 77: - return new GridTaskResultResponse(); - - case 78: - return new GridMemcachedMessageWrapper(); - - case 79: - return new GridStreamerCancelRequest(); - - case 80: - return new GridStreamerExecutionRequest(); - - case 81: - return new GridStreamerResponse(); - - case 82: - return new JobStealingRequest(); - - case 83: - return new GridClockDeltaVersion(); - - case 84: - return new GridByteArrayList(); - - case 85: - return new GridLongList(); - - case 86: - return new GridCacheVersion(); - - case 87: - return new GridDhtPartitionExchangeId(); - - case 88: - return new GridCacheValueBytes(); - - default: - assert false : "Invalid message type."; - - return null; - } - } - }, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, - 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, - 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, - 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, - 80, 81, 82, 83, 84, 85, 86, 87, 88); - } - - /** - * @param type Message type. - * @return New message. - */ - public static MessageAdapter create(byte type) { - if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE) - return new TcpCommunicationSpi.NodeIdMessage(); - else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE) - return new TcpCommunicationSpi.RecoveryLastReceivedMessage(); - else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE) - return new TcpCommunicationSpi.HandshakeMessage(); - else - return create0(type); - } - - /** - * @param type Message type. - * @return New message. - */ - private static MessageAdapter create0(byte type) { - if (type >= 0 && type < COMMON.length) { - GridTcpCommunicationMessageProducer producer = COMMON[type]; - - if (producer != null) - return producer.create(type); - else - throw new IllegalStateException("Common message type producer is not registered: " + type); - } - else { - GridTcpCommunicationMessageProducer c = CUSTOM.get(type); - - if (c != null) - return c.create(type); - else - throw new IllegalStateException("Custom message type producer is not registered: " + type); - } - } - - /** - * Register message producer for common message type. - * - * @param producer Producer. - * @param types Types applicable for this producer. - */ - public static void registerCommon(GridTcpCommunicationMessageProducer producer, int... types) { - for (int type : types) { - assert type >= 0 && type < COMMON.length : "Common type being registered is out of common messages " + - "array length: " + type; - - COMMON[type] = producer; - } - } - - /** - * Registers factory for custom message. Used for test purposes. - * - * @param producer Message producer. - * @param type Message type. - */ - public static void registerCustom(GridTcpCommunicationMessageProducer producer, byte type) { - assert producer != null; - - CUSTOM.put(type, producer); - } - - /** - * @return Common message producers. - */ - public static GridTcpCommunicationMessageProducer[] commonProducers() { - return COMMON; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java deleted file mode 100644 index 2932002..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpCommunicationMessageProducer.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.direct; - -import org.apache.ignite.plugin.extensions.communication.*; - -/** - * Message producer. Each component have to register it's own message producer. - */ -public interface GridTcpCommunicationMessageProducer { - /** - * Create message. - * - * @param type Message type. - * @return Communication message. - */ - public MessageAdapter create(byte type); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java deleted file mode 100644 index 5c66a04..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/GridTcpMessageFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.direct; - -import org.apache.ignite.plugin.extensions.communication.*; - -/** - * - */ -public interface GridTcpMessageFactory { - /** - * @param type Message type. - * @return Message instance. - */ - public MessageAdapter create(byte type); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 3e85199..117fbe9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -551,7 +550,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.io().messageWriterFactory(); } - @Override public GridTcpMessageFactory messageFactory() { + @Override public MessageFactory messageFactory() { return ctx.io().messageFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 2d7b67f..a44efd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -132,20 +132,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa private final LongAdder workersCnt = new LongAdder(); /** */ - private int pluginMsg = GridTcpCommunicationMessageFactory.MAX_COMMON_TYPE; - - /** */ - private Map<Byte, GridTcpCommunicationMessageProducer> pluginMsgs; - - /** */ - private GridTcpMessageFactory msgFactory; + private MessageFactory msgFactory; /** */ private MessageWriterFactory writerFactory; - /** */ - private MessageReaderFactory readerFactory; - /** * @param ctx Grid kernal context. */ @@ -161,71 +152,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param producer Message producer. - * @return Message type code. - */ - public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer) { - int nextMsg = ++pluginMsg; - - if (nextMsg > Byte.MAX_VALUE) - throw new IgniteException(); - - if (pluginMsgs == null) - pluginMsgs = new HashMap<>(); - - pluginMsgs.put((byte)nextMsg, producer); - - return (byte)nextMsg; - } - - /** - * Initializes manager (called prior to discovery start, but after all other components). - */ - public void initMessageFactory() { - final GridTcpCommunicationMessageProducer[] common = GridTcpCommunicationMessageFactory.commonProducers(); - - final GridTcpCommunicationMessageProducer[] producers; - - if (pluginMsgs != null) { - producers = Arrays.copyOf(common, pluginMsg + 1); - - for (Map.Entry<Byte, GridTcpCommunicationMessageProducer> e : pluginMsgs.entrySet()) { - assert producers[e.getKey()] == null : e.getKey(); - - producers[e.getKey()] = e.getValue(); - } - - pluginMsgs = null; - } - else - producers = common; - - msgFactory = new GridTcpMessageFactory() { - @Override public MessageAdapter create(byte type) { - MessageAdapter msg; - - if (type < 0 || type >= producers.length) - msg = GridTcpCommunicationMessageFactory.create(type); - else { - GridTcpCommunicationMessageProducer producer = producers[type]; - - if (producer == null) - throw new IllegalStateException("Common message type producer is not registered: " + type); - - msg = producer.create(type); - } - - msg.setReader(readerFactory.reader()); - - return msg; - } - }; - } - - /** * @return Message factory. */ - public GridTcpMessageFactory messageFactory() { + public MessageFactory messageFactory() { assert msgFactory != null; return msgFactory; @@ -291,6 +220,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; } + MessageReaderFactory readerFactory; + MessageReaderFactory[] readerExt = ctx.plugins().extensions(MessageReaderFactory.class); if (readerExt != null && readerExt.length > 0) @@ -303,6 +234,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa }; } + msgFactory = new GridIoMessageFactory(readerFactory, ctx.plugins().extensions(MessageFactory.class)); + if (log.isDebugEnabled()) log.debug(startInfo()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java new file mode 100644 index 0000000..0634b1f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.checkpoint.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.clock.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.processors.dataload.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.handlers.task.*; +import org.apache.ignite.internal.processors.rest.protocols.tcp.*; +import org.apache.ignite.internal.processors.streamer.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.collision.jobstealing.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.jdk8.backport.*; + +import java.util.*; + +/** + * Message factory implementation. + */ +public class GridIoMessageFactory implements MessageFactory { + /** Custom messages registry. Used for test purposes. */ + private static final Map<Byte, IgniteOutClosure<MessageAdapter>> CUSTOM = new ConcurrentHashMap8<>(); + + /** Message reader factory. */ + private final MessageReaderFactory readerFactory; + + /** Extensions. */ + private final MessageFactory[] ext; + + /** + * @param readerFactory Message reader factory. + * @param ext Extensions. + */ + public GridIoMessageFactory(MessageReaderFactory readerFactory, MessageFactory[] ext) { + assert readerFactory != null; + + this.readerFactory = readerFactory; + this.ext = ext; + } + + /** {@inheritDoc} */ + @Override public MessageAdapter create(byte type) { + MessageAdapter msg = null; + + switch (type) { + case TcpCommunicationSpi.NODE_ID_MSG_TYPE: + return new TcpCommunicationSpi.NodeIdMessage(); + + case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE: + return new TcpCommunicationSpi.RecoveryLastReceivedMessage(); + + case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE: + return new TcpCommunicationSpi.HandshakeMessage(); + + case 0: + msg = new GridJobCancelRequest(); + + break; + + case 1: + msg = new GridJobExecuteRequest(); + + break; + + case 2: + msg = new GridJobExecuteResponse(); + + break; + + case 3: + msg = new GridJobSiblingsRequest(); + + break; + + case 4: + msg = new GridJobSiblingsResponse(); + + break; + + case 5: + msg = new GridTaskCancelRequest(); + + break; + + case 6: + msg = new GridTaskSessionRequest(); + + break; + + case 7: + msg = new GridCheckpointRequest(); + + break; + + case 8: + msg = new GridIoMessage(); + + break; + + case 9: + msg = new GridIoUserMessage(); + + break; + + case 10: + msg = new GridDeploymentInfoBean(); + + break; + + case 11: + msg = new GridDeploymentRequest(); + + break; + + case 12: + msg = new GridDeploymentResponse(); + + break; + + case 13: + msg = new GridEventStorageMessage(); + + break; + + case 14: + msg = new GridCacheEvictionRequest(); + + break; + + case 15: + msg = new GridCacheEvictionResponse(); + + break; + + case 16: + msg = new GridCacheOptimisticCheckPreparedTxRequest(); + + break; + + case 17: + msg = new GridCacheOptimisticCheckPreparedTxResponse(); + + break; + + case 18: + msg = new GridCachePessimisticCheckCommittedTxRequest(); + + break; + + case 19: + msg = new GridCachePessimisticCheckCommittedTxResponse(); + + break; + + case 20: + msg = new GridCacheTtlUpdateRequest(); + + break; + + case 21: + msg = new GridDistributedLockRequest(); + + break; + + case 22: + msg = new GridDistributedLockResponse(); + + break; + + case 23: + msg = new GridDistributedTxFinishRequest(); + + break; + + case 24: + msg = new GridDistributedTxFinishResponse(); + + break; + + case 25: + msg = new GridDistributedTxPrepareRequest(); + + break; + + case 26: + msg = new GridDistributedTxPrepareResponse(); + + break; + + case 27: + msg = new GridDistributedUnlockRequest(); + + break; + + case 28: + msg = new GridDhtAffinityAssignmentRequest(); + + break; + + case 29: + msg = new GridDhtAffinityAssignmentResponse(); + + break; + + case 30: + msg = new GridDhtLockRequest(); + + break; + + case 31: + msg = new GridDhtLockResponse(); + + break; + + case 32: + msg = new GridDhtTxFinishRequest(); + + break; + + case 33: + msg = new GridDhtTxFinishResponse(); + + break; + + case 34: + msg = new GridDhtTxPrepareRequest(); + + break; + + case 35: + msg = new GridDhtTxPrepareResponse(); + + break; + + case 36: + msg = new GridDhtUnlockRequest(); + + break; + + case 37: + msg = new GridDhtAtomicDeferredUpdateResponse(); + + break; + + case 38: + msg = new GridDhtAtomicUpdateRequest(); + + break; + + case 39: + msg = new GridDhtAtomicUpdateResponse(); + + break; + + case 40: + msg = new GridNearAtomicUpdateRequest(); + + break; + + case 41: + msg = new GridNearAtomicUpdateResponse(); + + break; + + case 42: + msg = new GridDhtForceKeysRequest(); + + break; + + case 43: + msg = new GridDhtForceKeysResponse(); + + break; + + case 44: + msg = new GridDhtPartitionDemandMessage(); + + break; + + case 45: + msg = new GridDhtPartitionSupplyMessage(); + + break; + + case 46: + msg = new GridDhtPartitionsFullMessage(); + + break; + + case 47: + msg = new GridDhtPartitionsSingleMessage(); + + break; + + case 48: + msg = new GridDhtPartitionsSingleRequest(); + + break; + + case 49: + msg = new GridNearGetRequest(); + + break; + + case 50: + msg = new GridNearGetResponse(); + + break; + + case 51: + msg = new GridNearLockRequest(); + + break; + + case 52: + msg = new GridNearLockResponse(); + + break; + + case 53: + msg = new GridNearTxFinishRequest(); + + break; + + case 54: + msg = new GridNearTxFinishResponse(); + + break; + + case 55: + msg = new GridNearTxPrepareRequest(); + + break; + + case 56: + msg = new GridNearTxPrepareResponse(); + + break; + + case 57: + msg = new GridNearUnlockRequest(); + + break; + + case 58: + msg = new GridCacheQueryRequest(); + + break; + + case 59: + msg = new GridCacheQueryResponse(); + + break; + + case 60: + msg = new GridClockDeltaSnapshotMessage(); + + break; + + case 61: + msg = new GridContinuousMessage(); + + break; + + case 62: + msg = new GridDataLoadRequest(); + + break; + + case 63: + msg = new GridDataLoadResponse(); + + break; + + case 64: + msg = new GridGgfsAckMessage(); + + break; + + case 65: + msg = new GridGgfsBlockKey(); + + break; + + case 66: + msg = new GridGgfsBlocksMessage(); + + break; + + case 67: + msg = new GridGgfsDeleteMessage(); + + break; + + case 68: + msg = new GridGgfsFileAffinityRange(); + + break; + + case 69: + msg = new GridGgfsFragmentizerRequest(); + + break; + + case 70: + msg = new GridGgfsFragmentizerResponse(); + + break; + + case 71: + msg = new GridGgfsSyncMessage(); + + break; + + case 72: + msg = new GridClientHandshakeRequestWrapper(); + + break; + + case 73: + msg = new GridClientHandshakeResponseWrapper(); + + break; + + case 74: + msg = new GridClientMessageWrapper(); + + break; + + case 75: + msg = new GridClientPingPacketWrapper(); + + break; + + case 76: + msg = new GridTaskResultRequest(); + + break; + + case 77: + msg = new GridTaskResultResponse(); + + break; + + case 78: + msg = new GridMemcachedMessageWrapper(); + + break; + + case 79: + msg = new GridStreamerCancelRequest(); + + break; + + case 80: + msg = new GridStreamerExecutionRequest(); + + break; + + case 81: + msg = new GridStreamerResponse(); + + break; + + case 82: + msg = new JobStealingRequest(); + + break; + + case 83: + msg = new GridClockDeltaVersion(); + + break; + + case 84: + msg = new GridByteArrayList(); + + break; + + case 85: + msg = new GridLongList(); + + break; + + case 86: + msg = new GridCacheVersion(); + + break; + + case 87: + msg = new GridDhtPartitionExchangeId(); + + break; + + case 88: + msg = new GridCacheValueBytes(); + + break; + + default: + if (ext != null) { + for (MessageFactory factory : ext) { + msg = factory.create(type); + + if (msg != null) + break; + } + } + + if (msg == null) { + IgniteOutClosure<MessageAdapter> c = CUSTOM.get(type); + + if (c != null) + msg = c.apply(); + } + } + + if (msg == null) + throw new IgniteException("Invalid message type: " + type); + + msg.setReader(readerFactory.reader()); + + return msg; + } + + /** + * Registers factory for custom message. Used for test purposes. + * + * @param type Message type. + * @param c Message producer. + */ + public static void registerCustom(byte type, IgniteOutClosure<MessageAdapter> c) { + assert c != null; + + CUSTOM.put(type, c); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 0071942..021aa91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -37,7 +36,7 @@ public class GridDirectParser implements GridNioParser { private IgniteSpiAdapter spi; /** */ - private GridTcpMessageFactory msgFactory; + private MessageFactory msgFactory; /** * @param spi Spi. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java index 17249fa..9ae5bbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReader.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.util.nio; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -39,5 +38,5 @@ public interface GridNioMessageReader { /** * @return Optional message factory. */ - @Nullable public GridTcpMessageFactory messageFactory(); + @Nullable public MessageFactory messageFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 42a540b..585bf6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -2288,7 +2288,7 @@ public class GridNioServer<T> { } /** - * @param messageWriterFactory Message writer factory. + * @param messageWriterFactory Message writer factory.. * @return This for chaining. */ public Builder<T> messageWriterFactory(MessageWriterFactory messageWriterFactory) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java b/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java index a59d7ab..0704700 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/PluginContext.java @@ -20,7 +20,6 @@ package org.apache.ignite.plugin; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.spi.*; import java.util.*; @@ -95,10 +94,4 @@ public interface PluginContext { * @param cls Class. */ public void deregisterPorts(Class<?> cls); - - /** - * @param producer Message producer. - * @return Message type code. - */ - public byte registerMessageProducer(GridTcpCommunicationMessageProducer producer); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java new file mode 100644 index 0000000..d443209 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +import org.apache.ignite.plugin.*; + +/** + * + */ +public interface MessageFactory extends Extension { + /** + * @param type Message type. + * @return Message instance. + */ + public MessageAdapter create(byte type); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index c3b4700..6ea655a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.authentication.*; @@ -730,7 +729,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement } /** {@inheritDoc} */ - @Override public GridTcpMessageFactory messageFactory() { + @Override public MessageFactory messageFactory() { return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 1f68226..ce4c0c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -20,7 +20,6 @@ package org.apache.ignite.spi; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; -import org.apache.ignite.internal.direct.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -408,5 +407,5 @@ public interface IgniteSpiContext { /** * @return Message factory. */ - public GridTcpMessageFactory messageFactory(); + public MessageFactory messageFactory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index d99cdfd..7f39141 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.communication; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.direct.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -50,11 +51,11 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest private int bufSize; static { - GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { - @Override public MessageAdapter create(byte type) { + GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<MessageAdapter>() { + @Override public MessageAdapter apply() { return new TestMessage(); } - }, DIRECT_TYPE); + }); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 1aabf89..2b7dac3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.spi.communication; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.direct.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -62,11 +62,11 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS * */ static { - GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { - @Override public MessageAdapter create(byte type) { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() { + @Override public MessageAdapter apply() { return new GridTestMessage(); } - }, GridTestMessage.DIRECT_TYPE); + }); } /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 39bfe37..175a4c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -20,9 +20,10 @@ package org.apache.ignite.spi.communication.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.direct.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -65,12 +66,11 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic * */ static { - GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { - @Override - public MessageAdapter create(byte type) { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() { + @Override public MessageAdapter apply() { return new GridTestMessage(); } - }, GridTestMessage.DIRECT_TYPE); + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index 5ae3d2b..159173e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.spi.communication.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.direct.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; import org.apache.ignite.internal.util.typedef.*; @@ -74,11 +74,11 @@ public abstract class GridTcpCommunicationSpiMultithreadedSelfTest extends GridS private static boolean reject; static { - GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { - @Override public MessageAdapter create(byte type) { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() { + @Override public MessageAdapter apply() { return new GridTestMessage(); } - }, GridTestMessage.DIRECT_TYPE); + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 78e1b81..2ba9086 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -19,9 +19,10 @@ package org.apache.ignite.spi.communication.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.direct.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -56,12 +57,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS * */ static { - GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { - @Override - public MessageAdapter create(byte type) { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() { + @Override public MessageAdapter apply() { return new GridTestMessage(); } - }, GridTestMessage.DIRECT_TYPE); + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index c2c1676..86b4e44 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -20,9 +20,10 @@ package org.apache.ignite.spi.communication.tcp; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.direct.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -65,11 +66,11 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> * */ static { - GridTcpCommunicationMessageFactory.registerCustom(new GridTcpCommunicationMessageProducer() { - @Override public MessageAdapter create(byte type) { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<MessageAdapter>() { + @Override public MessageAdapter apply() { return new GridTestMessage(); } - }, GridTestMessage.DIRECT_TYPE); + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c19aa4a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 76fd302..bfcfe2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -525,16 +525,12 @@ public class GridSpiTestContext implements IgniteSpiContext { } /** {@inheritDoc} */ - @Override public GridTcpMessageFactory messageFactory() { - return new GridTcpMessageFactory() { - @Override public MessageAdapter create(byte type) { - MessageAdapter msg = GridTcpCommunicationMessageFactory.create(type); - - msg.setReader(new DirectMessageReader(null)); - - return msg; + @Override public MessageFactory messageFactory() { + return new GridIoMessageFactory(new MessageReaderFactory() { + @Override public MessageReader reader() { + return new DirectMessageReader(null); } - }; + }, null); } /**