http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java index 5c1578c..d1938bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java @@ -271,7 +271,7 @@ public class GridDataLoadRequest extends MessageAdapter { writer.incrementState(); case 6: - if (!writer.writeMap("ldrParticipants", ldrParticipants, Type.UUID, Type.IGNITE_UUID)) + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) return false; writer.incrementState(); @@ -318,20 +318,20 @@ public class GridDataLoadRequest extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: cacheName = reader.readString("cacheName"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: clsLdrId = reader.readIgniteUuid("clsLdrId"); @@ -339,7 +339,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: colBytes = reader.readByteArray("colBytes"); @@ -347,7 +347,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 3: byte depModeOrd; @@ -359,7 +359,7 @@ public class GridDataLoadRequest extends MessageAdapter { depMode = DeploymentMode.fromOrdinal(depModeOrd); - readState++; + reader.incrementState(); case 4: forceLocDep = reader.readBoolean("forceLocDep"); @@ -367,7 +367,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 5: ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership"); @@ -375,15 +375,15 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 6: - ldrParticipants = reader.readMap("ldrParticipants", Type.UUID, Type.IGNITE_UUID, false); + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 7: reqId = reader.readLong("reqId"); @@ -391,7 +391,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 8: resTopicBytes = reader.readByteArray("resTopicBytes"); @@ -399,7 +399,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 9: sampleClsName = reader.readString("sampleClsName"); @@ -407,7 +407,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 10: skipStore = reader.readBoolean("skipStore"); @@ -415,7 +415,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 11: updaterBytes = reader.readByteArray("updaterBytes"); @@ -423,7 +423,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 12: userVer = reader.readString("userVer"); @@ -431,7 +431,7 @@ public class GridDataLoadRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java index 84f11d5..4c0b52f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java @@ -118,20 +118,20 @@ public class GridDataLoadResponse extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: forceLocDep = reader.readBoolean("forceLocDep"); @@ -139,7 +139,7 @@ public class GridDataLoadResponse extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: reqId = reader.readLong("reqId"); @@ -147,7 +147,7 @@ public class GridDataLoadResponse extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java index 5e6917e..f52c6de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java @@ -141,23 +141,23 @@ public class IgfsAckMessage extends IgfsCommunicationMessage { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - if (!super.readFrom(buf)) + if (!super.readFrom(buf, reader)) return false; - switch (readState) { + switch (reader.state()) { case 0: errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: fileId = reader.readIgniteUuid("fileId"); @@ -165,7 +165,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: id = reader.readLong("id"); @@ -173,7 +173,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java index b9be6a3..37e5881 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java @@ -199,20 +199,20 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: affKey = reader.readIgniteUuid("affKey"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: blockId = reader.readLong("blockId"); @@ -220,7 +220,7 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: evictExclude = reader.readBoolean("evictExclude"); @@ -228,7 +228,7 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 3: fileId = reader.readIgniteUuid("fileId"); @@ -236,7 +236,7 @@ public final class IgfsBlockKey extends MessageAdapter implements Externalizable if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java index 006ca39..fbf2796 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java @@ -99,7 +99,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage { switch (writer.state()) { case 0: - if (!writer.writeMap("blocks", blocks, Type.MSG, Type.BYTE_ARR)) + if (!writer.writeMap("blocks", blocks, MessageCollectionItemType.MSG, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); @@ -122,23 +122,23 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - if (!super.readFrom(buf)) + if (!super.readFrom(buf, reader)) return false; - switch (readState) { + switch (reader.state()) { case 0: - blocks = reader.readMap("blocks", Type.MSG, Type.BYTE_ARR, false); + blocks = reader.readMap("blocks", MessageCollectionItemType.MSG, MessageCollectionItemType.BYTE_ARR, false); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: fileId = reader.readIgniteUuid("fileId"); @@ -146,7 +146,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: id = reader.readLong("id"); @@ -154,7 +154,7 @@ public class IgfsBlocksMessage extends IgfsCommunicationMessage { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java index ecfd876..53626d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCommunicationMessage.java @@ -63,7 +63,7 @@ public abstract class IgfsCommunicationMessage extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java index e02f1a3..2196525 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java @@ -139,23 +139,23 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - if (!super.readFrom(buf)) + if (!super.readFrom(buf, reader)) return false; - switch (readState) { + switch (reader.state()) { case 0: errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: id = reader.readIgniteUuid("id"); @@ -163,7 +163,7 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java index e736520..2b558fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileAffinityRange.java @@ -310,20 +310,20 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: affKey = reader.readIgniteUuid("affKey"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: done = reader.readBoolean("done"); @@ -331,7 +331,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: endOff = reader.readLong("endOff"); @@ -339,7 +339,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 3: startOff = reader.readLong("startOff"); @@ -347,7 +347,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 4: status = reader.readInt("status"); @@ -355,7 +355,7 @@ public class IgfsFileAffinityRange extends MessageAdapter implements Externaliza if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java index f915c83..9223009 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerRequest.java @@ -100,7 +100,7 @@ public class IgfsFragmentizerRequest extends IgfsCommunicationMessage { writer.incrementState(); case 1: - if (!writer.writeCollection("fragmentRanges", fragmentRanges, Type.MSG)) + if (!writer.writeCollection("fragmentRanges", fragmentRanges, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -111,31 +111,31 @@ public class IgfsFragmentizerRequest extends IgfsCommunicationMessage { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - if (!super.readFrom(buf)) + if (!super.readFrom(buf, reader)) return false; - switch (readState) { + switch (reader.state()) { case 0: fileId = reader.readIgniteUuid("fileId"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: - fragmentRanges = reader.readCollection("fragmentRanges", Type.MSG); + fragmentRanges = reader.readCollection("fragmentRanges", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java index d0cc4d1..9ba6920 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerResponse.java @@ -81,23 +81,23 @@ public class IgfsFragmentizerResponse extends IgfsCommunicationMessage { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - if (!super.readFrom(buf)) + if (!super.readFrom(buf, reader)) return false; - switch (readState) { + switch (reader.state()) { case 0: fileId = reader.readIgniteUuid("fileId"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java index 2f8b941..0fb683c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java @@ -104,23 +104,23 @@ public class IgfsSyncMessage extends IgfsCommunicationMessage { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - if (!super.readFrom(buf)) + if (!super.readFrom(buf, reader)) return false; - switch (readState) { + switch (reader.state()) { case 0: order = reader.readLong("order"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: res = reader.readBoolean("res"); @@ -128,7 +128,7 @@ public class IgfsSyncMessage extends IgfsCommunicationMessage { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java index 7fd99e3..1da84c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java @@ -128,20 +128,20 @@ public class GridTaskResultRequest extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: taskId = reader.readIgniteUuid("taskId"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: topicBytes = reader.readByteArray("topicBytes"); @@ -149,7 +149,7 @@ public class GridTaskResultRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java index 9636dc8..4545beb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java @@ -158,20 +158,20 @@ public class GridTaskResultResponse extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: err = reader.readString("err"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: finished = reader.readBoolean("finished"); @@ -179,7 +179,7 @@ public class GridTaskResultResponse extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: found = reader.readBoolean("found"); @@ -187,7 +187,7 @@ public class GridTaskResultResponse extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 3: resBytes = reader.readByteArray("resBytes"); @@ -195,7 +195,7 @@ public class GridTaskResultResponse extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java index 264b73c..0c96ce5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java @@ -78,20 +78,20 @@ public class GridStreamerCancelRequest extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: cancelledFutId = reader.readIgniteUuid("cancelledFutId"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java index c4d01c8..2f983d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java @@ -186,7 +186,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter { writer.incrementState(); case 4: - if (!writer.writeMap("ldrParticipants", ldrParticipants, Type.UUID, Type.IGNITE_UUID)) + if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) return false; writer.incrementState(); @@ -209,20 +209,20 @@ public class GridStreamerExecutionRequest extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: batchBytes = reader.readByteArray("batchBytes"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: clsLdrId = reader.readIgniteUuid("clsLdrId"); @@ -230,7 +230,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: byte depModeOrd; @@ -242,7 +242,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter { depMode = DeploymentMode.fromOrdinal(depModeOrd); - readState++; + reader.incrementState(); case 3: forceLocDep = reader.readBoolean("forceLocDep"); @@ -250,15 +250,15 @@ public class GridStreamerExecutionRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 4: - ldrParticipants = reader.readMap("ldrParticipants", Type.UUID, Type.IGNITE_UUID, false); + ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 5: sampleClsName = reader.readString("sampleClsName"); @@ -266,7 +266,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 6: userVer = reader.readString("userVer"); @@ -274,7 +274,7 @@ public class GridStreamerExecutionRequest extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java index 2260921..6c47179 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java @@ -104,20 +104,20 @@ public class GridStreamerResponse extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: futId = reader.readIgniteUuid("futId"); @@ -125,7 +125,7 @@ public class GridStreamerResponse extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java index 85bd7c8..833bb7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridByteArrayList.java @@ -436,20 +436,20 @@ public class GridByteArrayList extends MessageAdapter implements Externalizable } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: data = reader.readByteArray("data"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: size = reader.readInt("size"); @@ -457,7 +457,7 @@ public class GridByteArrayList extends MessageAdapter implements Externalizable if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java index 439063e..64bb7ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java @@ -534,20 +534,20 @@ public class GridLongList extends MessageAdapter implements Externalizable { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: arr = reader.readLongArray("arr"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: idx = reader.readInt("idx"); @@ -555,7 +555,7 @@ public class GridLongList extends MessageAdapter implements Externalizable { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index b76c5d4..7b82d92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -31,6 +31,9 @@ public class GridDirectParser implements GridNioParser { /** Message metadata key. */ private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** Reader metadata key. */ + private static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** */ private final MessageFactory msgFactory; @@ -54,16 +57,24 @@ public class GridDirectParser implements GridNioParser { throws IOException, IgniteCheckedException { MessageAdapter msg = ses.removeMeta(MSG_META_KEY); + MessageReader reader = null; + if (msg == null && buf.hasRemaining()) { msg = msgFactory.create(buf.get()); - msg.setReader(formatter.reader(msgFactory)); + ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory)); } boolean finished = false; - if (buf.hasRemaining()) - finished = msg.readFrom(buf); + if (buf.hasRemaining()) { + if (reader == null) + reader = ses.meta(READER_META_KEY); + + assert reader != null; + + finished = msg.readFrom(buf, reader); + } if (finished) return msg; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 0d5649a..af9c9b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -141,6 +141,10 @@ public class GridNioServer<T> { @GridToStringExclude private MessageFormatter formatter; + /** */ + @GridToStringExclude + private IgnitePredicate<MessageAdapter> skipRecoveryPred; + /** Static initializer ensures single-threaded execution of workaround. */ static { // This is a workaround for JDK bug (NPE in Selector.open()). @@ -169,6 +173,7 @@ public class GridNioServer<T> { * @param daemon Daemon flag to create threads. * @param metricsLsnr Metrics listener. * @param formatter Message formatter. + * @param skipRecoveryPred Skip recovery predicate. * @param filters Filters for this server. * @throws IgniteCheckedException If failed. */ @@ -189,6 +194,7 @@ public class GridNioServer<T> { boolean daemon, GridNioMetricsListener metricsLsnr, MessageFormatter formatter, + IgnitePredicate<MessageAdapter> skipRecoveryPred, GridNioFilter... filters ) throws IgniteCheckedException { A.notNull(addr, "addr"); @@ -254,6 +260,8 @@ public class GridNioServer<T> { this.directMode = directMode; this.metricsLsnr = metricsLsnr; this.formatter = formatter; + + this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<MessageAdapter>alwaysFalse(); } /** @@ -351,7 +359,8 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, + skipRecoveryPred.apply(msg)); send0(impl, fut, false); @@ -404,7 +413,8 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, + skipRecoveryPred.apply(msg)); if (lsnr != null) { fut.listenAsync(lsnr); @@ -438,7 +448,7 @@ public class GridNioServer<T> { for (GridNioFuture<?> fut : futs) { fut.messageThread(true); - ((NioOperationFuture)fut).resetMessage(ses0); + ((NioOperationFuture)fut).resetSession(ses0); } ses0.resend(futs); @@ -1781,6 +1791,9 @@ public class GridNioServer<T> { /** */ private Map<Integer, ?> meta; + /** */ + private boolean skipRecovery; + /** * Creates registration request for a given socket channel. * @@ -1847,9 +1860,10 @@ public class GridNioServer<T> { * @param ses Session to change. * @param op Requested operation. * @param commMsg Direct message. + * @param skipRecovery Skip recovery flag. */ NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, - MessageAdapter commMsg) { + MessageAdapter commMsg, boolean skipRecovery) { assert ses != null; assert op != null; assert op != NioOperation.REGISTER; @@ -1858,6 +1872,7 @@ public class GridNioServer<T> { this.ses = ses; this.op = op; this.commMsg = commMsg; + this.skipRecovery = skipRecovery; } /** @@ -1884,11 +1899,9 @@ public class GridNioServer<T> { /** * @param ses New session instance. */ - private void resetMessage(GridSelectorNioSessionImpl ses) { + private void resetSession(GridSelectorNioSessionImpl ses) { assert commMsg != null; -// commMsg = commMsg.clone(); - this.ses = ses; } @@ -1932,7 +1945,7 @@ public class GridNioServer<T> { /** {@inheritDoc} */ @Override public boolean skipRecovery() { - return commMsg != null && commMsg.skipRecovery(); + return skipRecovery; } /** {@inheritDoc} */ @@ -2090,6 +2103,9 @@ public class GridNioServer<T> { /** Message formatter. */ private MessageFormatter formatter; + /** Skip recovery predicate. */ + private IgnitePredicate<MessageAdapter> skipRecoveryPred; + /** * Finishes building the instance. * @@ -2114,6 +2130,7 @@ public class GridNioServer<T> { daemon, metricsLsnr, formatter, + skipRecoveryPred, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS ); @@ -2316,5 +2333,15 @@ public class GridNioServer<T> { return this; } + + /** + * @param skipRecoveryPred Skip recovery predicate. + * @return This for chaining. + */ + public Builder<T> skipRecoveryPredicate(IgnitePredicate<MessageAdapter> skipRecoveryPred) { + this.skipRecoveryPred = skipRecoveryPred; + + return this; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java index 2402d5e..d3d3f5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageAdapter.java @@ -17,8 +17,6 @@ package org.apache.ignite.plugin.extensions.communication; -import org.jetbrains.annotations.*; - import java.io.*; import java.nio.*; @@ -26,22 +24,6 @@ import java.nio.*; * Base class for all communication messages. */ public abstract class MessageAdapter implements Serializable { - /** Message reader. */ - protected MessageReader reader; - - /** Current read state. */ - protected int readState; - - /** - * @param reader Message reader. - */ - public final void setReader(MessageReader reader) { - assert this.reader == null; - assert reader != null; - - this.reader = reader; - } - /** * Writes this message to provided byte buffer. * @@ -55,9 +37,10 @@ public abstract class MessageAdapter implements Serializable { * Reads this message from provided byte buffer. * * @param buf Byte buffer. + * @param reader Reader. * @return Whether message was fully read. */ - public abstract boolean readFrom(ByteBuffer buf); + public abstract boolean readFrom(ByteBuffer buf, MessageReader reader); /** * Gets message type. @@ -72,94 +55,4 @@ public abstract class MessageAdapter implements Serializable { * @return Fields count. */ public abstract byte fieldsCount(); - - /** - * Defines whether recovery for this message should be skipped. - * - * @return Whether recovery for this message should be skipped. - */ - public boolean skipRecovery() { - return false; - } - - /** - * Enum representing possible types of collection items. - */ - public enum Type { - /** Byte. */ - BYTE, - - /** Short. */ - SHORT, - - /** Integer. */ - INT, - - /** Long. */ - LONG, - - /** Float. */ - FLOAT, - - /** Double. */ - DOUBLE, - - /** Character. */ - CHAR, - - /** Boolean. */ - BOOLEAN, - - /** Byte array. */ - BYTE_ARR, - - /** Short array. */ - SHORT_ARR, - - /** Integer array. */ - INT_ARR, - - /** Long array. */ - LONG_ARR, - - /** Float array. */ - FLOAT_ARR, - - /** Double array. */ - DOUBLE_ARR, - - /** Character array. */ - CHAR_ARR, - - /** Boolean array. */ - BOOLEAN_ARR, - - /** String. */ - STRING, - - /** Bit set. */ - BIT_SET, - - /** UUID. */ - UUID, - - /** Ignite UUID. */ - IGNITE_UUID, - - /** Message. */ - MSG; - - /** Enum values. */ - private static final Type[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static Type fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java new file mode 100644 index 0000000..ecb79cb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +import org.jetbrains.annotations.*; + +/** + * Enum representing possible types of collection items. + */ +public enum MessageCollectionItemType { + /** Byte. */ + BYTE, + + /** Short. */ + SHORT, + + /** Integer. */ + INT, + + /** Long. */ + LONG, + + /** Float. */ + FLOAT, + + /** Double. */ + DOUBLE, + + /** Character. */ + CHAR, + + /** Boolean. */ + BOOLEAN, + + /** Byte array. */ + BYTE_ARR, + + /** Short array. */ + SHORT_ARR, + + /** Integer array. */ + INT_ARR, + + /** Long array. */ + LONG_ARR, + + /** Float array. */ + FLOAT_ARR, + + /** Double array. */ + DOUBLE_ARR, + + /** Character array. */ + CHAR_ARR, + + /** Boolean array. */ + BOOLEAN_ARR, + + /** String. */ + STRING, + + /** Bit set. */ + BIT_SET, + + /** UUID. */ + UUID, + + /** Ignite UUID. */ + IGNITE_UUID, + + /** Message. */ + MSG; + + /** Enum values. */ + private static final MessageCollectionItemType[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static MessageCollectionItemType fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index dae8845..8eeb7b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -213,7 +213,7 @@ public interface MessageReader { * @param itemCls Array component class. * @return Array of objects. */ - public <T> T[] readObjectArray(String name, MessageAdapter.Type itemType, Class<T> itemCls); + public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls); /** * Reads collection. @@ -222,7 +222,7 @@ public interface MessageReader { * @param itemType Collection item type. * @return Collection. */ - public <C extends Collection<?>> C readCollection(String name, MessageAdapter.Type itemType); + public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType); /** * Reads map. @@ -233,8 +233,8 @@ public interface MessageReader { * @param linked Whether {@link LinkedHashMap} should be created. * @return Map. */ - public <M extends Map<?, ?>> M readMap(String name, MessageAdapter.Type keyType, MessageAdapter.Type valType, - boolean linked); + public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType, + MessageCollectionItemType valType, boolean linked); /** * Tells whether last invocation of any of {@code readXXX(...)} @@ -244,4 +244,16 @@ public interface MessageReader { * @return Whether las value was fully read. */ public boolean isLastRead(); + + /** + * Gets current read state. + * + * @return Read state. + */ + public int state(); + + /** + * Increments read state. + */ + public void incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index 0e35b62..3251102 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -240,7 +240,7 @@ public interface MessageWriter { * @param itemType Array component type. * @return Whether array was fully written. */ - public <T> boolean writeObjectArray(String name, T[] arr, MessageAdapter.Type itemType); + public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType); /** * Writes collection. @@ -250,7 +250,7 @@ public interface MessageWriter { * @param itemType Collection item type. * @return Whether value was fully written. */ - public <T> boolean writeCollection(String name, Collection<T> col, MessageAdapter.Type itemType); + public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType); /** * Writes map. @@ -261,8 +261,8 @@ public interface MessageWriter { * @param valType Map value type. * @return Whether value was fully written. */ - public <K, V> boolean writeMap(String name, Map<K, V> map, MessageAdapter.Type keyType, - MessageAdapter.Type valType); + public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType, + MessageCollectionItemType valType); /** * @return Whether header of current message is already written. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java index d17fd23..487b58f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingRequest.java @@ -78,20 +78,20 @@ public class JobStealingRequest extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); if (!reader.beforeMessageRead()) return false; - switch (readState) { + switch (reader.state()) { case 0: delta = reader.readInt("delta"); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 255822e..eda4f5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1506,6 +1506,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridDirectParser parser = new GridDirectParser(messageFactory, messageFormatter); + IgnitePredicate<MessageAdapter> skipRecoveryPred = new IgnitePredicate<MessageAdapter>() { + @Override public boolean apply(MessageAdapter msg) { + return msg instanceof RecoveryLastReceivedMessage; + } + }; + GridNioServer<MessageAdapter> srvr = GridNioServer.<MessageAdapter>builder() .address(locHost) @@ -1526,6 +1532,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .filters(new GridNioCodecFilter(parser, log, true), new GridConnectionBytesVerifyFilter(log)) .messageFormatter(messageFormatter) + .skipRecoveryPredicate(skipRecoveryPred) .build(); boundTcpPort = port; @@ -3056,7 +3063,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { if (buf.remaining() < 32) return false; @@ -3134,7 +3141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { if (buf.remaining() < 8) return false; @@ -3148,13 +3155,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return RECOVERY_LAST_ID_MSG_TYPE; } - @Override public byte fieldsCount() { - return 0; // TODO: implement. - } - /** {@inheritDoc} */ - @Override public boolean skipRecovery() { - return true; + @Override public byte fieldsCount() { + return 0; } /** {@inheritDoc} */ @@ -3209,7 +3212,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { if (buf.remaining() < 16) return false; @@ -3225,8 +3228,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return NODE_ID_MSG_TYPE; } + /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 0; // TODO: implement. + return 0; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index 3f19bef..45e2bb7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -151,7 +151,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index f9add5a..ba8082b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@ -225,7 +225,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java index dbc8394..80944ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java @@ -113,7 +113,7 @@ class GridTestMessage extends MessageAdapter implements Externalizable { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ad02d6e3/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java index 9104d20..dea2c79 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridTestMessage.java @@ -134,17 +134,17 @@ public class GridTestMessage extends MessageAdapter { } /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); - switch (readState) { + switch (reader.state()) { case 0: srcNodeId = reader.readUuid(null); if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 1: msgId = reader.readLong(null); @@ -152,7 +152,7 @@ public class GridTestMessage extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 2: resId = reader.readLong(null); @@ -160,7 +160,7 @@ public class GridTestMessage extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); case 3: payload = reader.readByteArray(null); @@ -168,7 +168,7 @@ public class GridTestMessage extends MessageAdapter { if (!reader.isLastRead()) return false; - readState++; + reader.incrementState(); } return true;